Problem
The current rate limiting implementation (download_max_bytes_per_second / upload_max_bytes_per_second) uses a post-hoc sleep-based approach: transfer a chunk of data at full speed, then sleep to bring the average rate down to the target. This causes bursty behavior — alternating between 100% bandwidth utilization and 0% — which creates problems:
- Network jitter and buffer bloat: Bursty traffic fills switch/router buffers, increasing latency for other traffic on the same link
- Unfair bandwidth sharing: When multiple concurrent workers each burst independently, aggregate bandwidth temporarily exceeds the target
- Inaccurate enforcement: The sleep duration is calculated from a single chunk's transfer time, which doesn't account for I/O scheduling delays
Proposed Solution: Token-bucket rate limiter
Replace the post-hoc sleep with a token-bucket rate limiter that integrates directly into the io.Reader/io.Writer data path. The limiter is shared across all concurrent workers, enforcing an aggregate bandwidth cap.
Core implementation
// TokenBucketLimiter implements a token-bucket rate limiter for streaming I/O.
type TokenBucketLimiter struct {
maxBytesPerSecond uint64
tokens int64
lastRefill time.Time
mu sync.Mutex
}
func NewTokenBucketLimiter(maxBytesPerSecond uint64) *TokenBucketLimiter {
return &TokenBucketLimiter{
maxBytesPerSecond: maxBytesPerSecond,
tokens: int64(maxBytesPerSecond), // start with 1s burst
lastRefill: time.Now(),
}
}
func (tb *TokenBucketLimiter) Wait(n int) {
if tb.maxBytesPerSecond == 0 || n == 0 {
return
}
for {
tb.mu.Lock()
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
refill := int64(float64(tb.maxBytesPerSecond) * elapsed.Seconds())
if refill > 0 {
tb.tokens += refill
tb.lastRefill = now
maxTokens := int64(tb.maxBytesPerSecond) * 2
if tb.tokens > maxTokens {
tb.tokens = maxTokens
}
}
if tb.tokens >= int64(n) {
tb.tokens -= int64(n)
tb.mu.Unlock()
return
}
deficit := int64(n) - tb.tokens
waitTime := time.Duration(float64(deficit) / float64(tb.maxBytesPerSecond) * float64(time.Second))
tb.mu.Unlock()
if waitTime > 10*time.Millisecond {
time.Sleep(waitTime)
} else {
time.Sleep(1 * time.Millisecond)
}
}
}
io.Reader/io.Writer wrappers
type RateLimitedReader struct {
reader io.Reader
limiter *TokenBucketLimiter
}
func (r *RateLimitedReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
if n > 0 {
r.limiter.Wait(n)
}
return n, err
}
// Shared limiter variant for aggregate enforcement:
func NewRateLimitedReaderWithLimiter(r io.Reader, limiter *TokenBucketLimiter) io.Reader {
if limiter == nil || limiter.maxBytesPerSecond == 0 {
return r
}
return &RateLimitedReader{reader: r, limiter: limiter}
}
Usage in download/upload paths
Create one shared limiter per operation, pass it to all concurrent workers:
func (bd *BackupDestination) DownloadPathParallel(..., maxSpeed uint64, concurrency int) (int64, error) {
// Shared limiter — total aggregate bandwidth is capped
limiter := NewTokenBucketLimiter(maxSpeed)
// ...
g.Go(func() error {
r, _ := bd.GetFileReader(ctx, remotePath)
throttledReader := NewRateLimitedReaderWithLimiter(r, limiter)
io.CopyBuffer(dst, throttledReader, buf)
})
}
Key design decisions
- Shared limiter across workers: A single
TokenBucketLimiter is created per upload/download operation and passed to all concurrent workers via NewRateLimitedReaderWithLimiter. This enforces aggregate bandwidth, not per-worker bandwidth.
- 2-second burst cap: Allows enough burstiness for efficiency (filling TCP windows) without exceeding the target for extended periods.
- Zero overhead when unlimited: When
maxBytesPerSecond == 0, the wrapper returns the original reader/writer directly — no allocation, no function call overhead.
- ReadCloser variant: A
RateLimitedReadCloser preserves the Close() method for APIs like PutFile that require io.ReadCloser.
Benefits
- Smooth bandwidth: Near-constant throughput instead of burst-sleep cycles
- Accurate aggregate enforcement: Total bandwidth across all workers stays within target
- No post-hoc calculation errors: Rate is enforced inline, not estimated after the fact
- Minimal overhead: Token check is O(1) with a mutex; no goroutine, no timer
Problem
The current rate limiting implementation (
download_max_bytes_per_second/upload_max_bytes_per_second) uses a post-hoc sleep-based approach: transfer a chunk of data at full speed, then sleep to bring the average rate down to the target. This causes bursty behavior — alternating between 100% bandwidth utilization and 0% — which creates problems:Proposed Solution: Token-bucket rate limiter
Replace the post-hoc sleep with a token-bucket rate limiter that integrates directly into the io.Reader/io.Writer data path. The limiter is shared across all concurrent workers, enforcing an aggregate bandwidth cap.
Core implementation
io.Reader/io.Writer wrappers
Usage in download/upload paths
Create one shared limiter per operation, pass it to all concurrent workers:
Key design decisions
TokenBucketLimiteris created per upload/download operation and passed to all concurrent workers viaNewRateLimitedReaderWithLimiter. This enforces aggregate bandwidth, not per-worker bandwidth.maxBytesPerSecond == 0, the wrapper returns the original reader/writer directly — no allocation, no function call overhead.RateLimitedReadCloserpreserves theClose()method for APIs likePutFilethat requireio.ReadCloser.Benefits