Skip to content

Replace post-hoc sleep throttling with token-bucket rate limiter #1377

@minguyen9988

Description

@minguyen9988

Problem

The current rate limiting implementation (download_max_bytes_per_second / upload_max_bytes_per_second) uses a post-hoc sleep-based approach: transfer a chunk of data at full speed, then sleep to bring the average rate down to the target. This causes bursty behavior — alternating between 100% bandwidth utilization and 0% — which creates problems:

  1. Network jitter and buffer bloat: Bursty traffic fills switch/router buffers, increasing latency for other traffic on the same link
  2. Unfair bandwidth sharing: When multiple concurrent workers each burst independently, aggregate bandwidth temporarily exceeds the target
  3. Inaccurate enforcement: The sleep duration is calculated from a single chunk's transfer time, which doesn't account for I/O scheduling delays

Proposed Solution: Token-bucket rate limiter

Replace the post-hoc sleep with a token-bucket rate limiter that integrates directly into the io.Reader/io.Writer data path. The limiter is shared across all concurrent workers, enforcing an aggregate bandwidth cap.

Core implementation

// TokenBucketLimiter implements a token-bucket rate limiter for streaming I/O.
type TokenBucketLimiter struct {
    maxBytesPerSecond uint64
    tokens            int64
    lastRefill        time.Time
    mu                sync.Mutex
}

func NewTokenBucketLimiter(maxBytesPerSecond uint64) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        maxBytesPerSecond: maxBytesPerSecond,
        tokens:            int64(maxBytesPerSecond), // start with 1s burst
        lastRefill:        time.Now(),
    }
}

func (tb *TokenBucketLimiter) Wait(n int) {
    if tb.maxBytesPerSecond == 0 || n == 0 {
        return
    }
    for {
        tb.mu.Lock()
        now := time.Now()
        elapsed := now.Sub(tb.lastRefill)
        refill := int64(float64(tb.maxBytesPerSecond) * elapsed.Seconds())
        if refill > 0 {
            tb.tokens += refill
            tb.lastRefill = now
            maxTokens := int64(tb.maxBytesPerSecond) * 2
            if tb.tokens > maxTokens {
                tb.tokens = maxTokens
            }
        }
        if tb.tokens >= int64(n) {
            tb.tokens -= int64(n)
            tb.mu.Unlock()
            return
        }
        deficit := int64(n) - tb.tokens
        waitTime := time.Duration(float64(deficit) / float64(tb.maxBytesPerSecond) * float64(time.Second))
        tb.mu.Unlock()
        if waitTime > 10*time.Millisecond {
            time.Sleep(waitTime)
        } else {
            time.Sleep(1 * time.Millisecond)
        }
    }
}

io.Reader/io.Writer wrappers

type RateLimitedReader struct {
    reader  io.Reader
    limiter *TokenBucketLimiter
}

func (r *RateLimitedReader) Read(p []byte) (int, error) {
    n, err := r.reader.Read(p)
    if n > 0 {
        r.limiter.Wait(n)
    }
    return n, err
}

// Shared limiter variant for aggregate enforcement:
func NewRateLimitedReaderWithLimiter(r io.Reader, limiter *TokenBucketLimiter) io.Reader {
    if limiter == nil || limiter.maxBytesPerSecond == 0 {
        return r
    }
    return &RateLimitedReader{reader: r, limiter: limiter}
}

Usage in download/upload paths

Create one shared limiter per operation, pass it to all concurrent workers:

func (bd *BackupDestination) DownloadPathParallel(..., maxSpeed uint64, concurrency int) (int64, error) {
    // Shared limiter — total aggregate bandwidth is capped
    limiter := NewTokenBucketLimiter(maxSpeed)
    // ...
    g.Go(func() error {
        r, _ := bd.GetFileReader(ctx, remotePath)
        throttledReader := NewRateLimitedReaderWithLimiter(r, limiter)
        io.CopyBuffer(dst, throttledReader, buf)
    })
}

Key design decisions

  • Shared limiter across workers: A single TokenBucketLimiter is created per upload/download operation and passed to all concurrent workers via NewRateLimitedReaderWithLimiter. This enforces aggregate bandwidth, not per-worker bandwidth.
  • 2-second burst cap: Allows enough burstiness for efficiency (filling TCP windows) without exceeding the target for extended periods.
  • Zero overhead when unlimited: When maxBytesPerSecond == 0, the wrapper returns the original reader/writer directly — no allocation, no function call overhead.
  • ReadCloser variant: A RateLimitedReadCloser preserves the Close() method for APIs like PutFile that require io.ReadCloser.

Benefits

  1. Smooth bandwidth: Near-constant throughput instead of burst-sleep cycles
  2. Accurate aggregate enforcement: Total bandwidth across all workers stays within target
  3. No post-hoc calculation errors: Rate is enforced inline, not estimated after the fact
  4. Minimal overhead: Token check is O(1) with a mutex; no goroutine, no timer

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions