Architecture

Implementing Backpressure and Flow Control in High-Throughput Streaming Systems

Every streaming outage I have been paged for had the same shape: a producer kept producing, a consumer fell behind, and the gap between them silently turned into memory. By the time anyone looked at a dashboard, latency had already collapsed and the JVM was thirty seconds from an OutOfMemoryError. The fix is never “make the consumer faster” in the moment – it is to have already designed the system so that a slow consumer slows the producer, or sheds load deliberately, instead of buffering its way to death.

That mechanism is backpressure: the consumer’s ability to push its rate limit upstream so the whole pipeline self-throttles to the speed of its slowest stage. This article walks the concrete controls – bounded buffers, pull and credit-based protocols, Kafka pause/resume, token-bucket admission, and load shedding – and how to compose them across a multi-stage pipeline. The running assumption is that you already have a pipeline doing real work and now need it to survive a load spike or a downstream stall without falling over.

Why unbounded queues cause latency collapse and OOM

Start with the failure mode, because it explains every design decision that follows. An unbounded queue between a fast producer and a slow consumer is a latency time bomb governed by Little’s Law:

L = lambda * W

L is the mean number of items in the system, lambda is the arrival rate, and W is the mean time each item spends in the system. Rearranged, W = L / lambda. If your consumer drains at 1,000 msg/s and the queue is holding 500,000 messages, the newest message waits roughly 500 seconds before it is even looked at. The queue did not protect you – it converted a throughput deficit into an unbounded latency deficit, and added heap pressure on the way.

The core insight: a buffer cannot fix a sustained rate mismatch. If arrival_rate > service_rate on average, queue depth grows without bound and so does latency. A buffer only absorbs bursts – transient periods where arrival exceeds service – and only if the average over the window stays below service rate. Any buffer you size as “big enough to never fill” is a buffer you have not reasoned about.

Two distinct things break. Latency collapses first, because every item now sits behind a growing backlog. Then memory follows, because each queued item holds a payload, and on the JVM, GC pause times climb with live-set size – which slows the consumer further, which grows the queue faster. That is the same positive-feedback loop that turns a retry into a retry storm, applied to queueing. Bounding the queue is what breaks the loop: a full bounded buffer forces a decision – block, drop, or shed – instead of silently consuming all available memory.

Step 1 – Backpressure models: pull, push-with-credit, reactive streams

There are three mechanisms to make a consumer’s rate visible to a producer. Pick deliberately; they have very different coupling and latency characteristics.

Pull-based. The consumer asks for the next item when it is ready. The producer cannot get ahead because nothing is delivered until requested. Kafka’s consumer is the canonical example – poll() fetches only when called, so a slow consumer simply polls less often and the broker holds the backlog on durable disk, not in your heap. Pull is the simplest correct model and the default you should reach for when the transport can buffer cheaply.

Push-with-credit. The consumer grants the producer a budget – “you may send me N more items” – and the producer sends until the credit is exhausted, refilling as the consumer processes. This is how gRPC/HTTP-2 flow control works at the protocol level (the receiver advertises a window; the sender may not exceed it) and how AMQP’s basic.qos prefetch works. Credit-based push keeps the latency benefit of pushing while still bounding in-flight work.

Reactive Streams. A standardized in-process protocol (the basis of Project Reactor, RxJava, Akka Streams, and java.util.concurrent.Flow) that formalizes credit-based pull. The Subscriber calls subscription.request(n), and the Publisher is contractually forbidden from emitting more than the sum of outstanding requests. It composes through every operator, so backpressure propagates end to end automatically.

// Reactive Streams: the demand signal IS the backpressure.
// request(n) is a credit grant; onNext may not exceed outstanding demand.
flux.subscribe(new BaseSubscriber<Order>() {
    @Override
    protected void hookOnSubscribe(Subscription s) {
        s.request(64); // initial credit window
    }
    @Override
    protected void hookOnNext(Order order) {
        process(order);
        request(1); // replenish one credit per processed item
    }
});

The trap that bites teams is the leaky boundary. Reactive backpressure only holds while you stay inside the protocol. The moment you bridge to something that does not honor demand – Flux.interval, a hot sensor feed, a Sinks.many().multicast() with the wrong overflow strategy – you are back to push, and you must attach an explicit overflow operator (onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest) or the buffer is effectively unbounded again.

Step 2 – Size bounded buffers and choose block, drop, or shed

Every buffer gets an explicit bound and an explicit overflow policy. “Unbounded” is not a policy; it is the absence of one.

Size the buffer to cover your worst tolerable burst, not your average load and never your peak observed load. A workable formula:

buffer_size = peak_burst_rate * acceptable_absorption_window

If you can tolerate a 2-second burst at 5,000 msg/s above service rate, size for ~10,000 items, then bound your heap by multiplying by mean payload size and confirming it fits comfortably inside your GC headroom. Round to something testable and load-test to the bound – a buffer you have never filled in a test is a buffer whose overflow path is untested.

Then choose what happens when it fills:

Policy Behavior on full buffer Use when
Block Producer thread parks until space frees Producer can legitimately slow (file read, pull source) and you need zero loss
Drop newest Reject the incoming item Newest data is least valuable; you must never stall the producer
Drop oldest / latest Evict head, keep newest Only the most recent value matters (telemetry, price ticks, gauges)
Shed (reject upstream) Return an explicit “busy” signal Producer is external and should retry or degrade

Blocking is real backpressure and is correct when the producer can absorb being slowed – but it is dangerous when the blocking thread also serves other work, because you have just coupled an unrelated request path to this buffer. Never block on a shared event-loop or a thread that holds a lock. A bounded blocking queue makes the contract explicit:

// Capacity 10_000, payloads ~1 KB => ~10 MB worst case, sits in GC headroom.
BlockingQueue<Event> queue = new ArrayBlockingQueue<>(10_000);

// Producer: block-with-timeout, then shed rather than wait forever.
if (!queue.offer(event, 50, TimeUnit.MILLISECONDS)) {
    meter.counter("pipeline.shed", "stage", "ingest").increment();
    throw new BackpressureException("ingest buffer full"); // surfaces as 429
}

The 50 ms offer timeout is the key detail: it bounds how long the producer waits before giving up. Unbounded blocking turns backpressure into a distributed deadlock the moment the consumer stalls permanently.

Step 3 – Consumer-driven flow control in Kafka with pause, resume, lag targets

Kafka is pull-based, which gives you backpressure almost for free – a slow consumer just calls poll() less often and the broker retains the backlog on disk. But two foot-guns remain.

First, max.poll.records controls how many records one poll() returns. If a batch takes longer than max.poll.interval.ms to process, the broker assumes the consumer is dead, kicks it from the group, and triggers a rebalance – which makes the lag worse. Set max.poll.records small enough that a full batch processes well within the interval:

max.poll.records=200
max.poll.interval.ms=300000
fetch.max.bytes=10485760
max.partition.fetch.bytes=1048576

Second, when you hand records off to an async stage with its own bounded buffer (the common pattern), you must stop fetching when that buffer fills – otherwise you re-introduce an unbounded queue inside your consumer. The correct tool is pause() / resume() on specific partitions, not Thread.sleep and not leaving the consumer group:

while (running) {
    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(200));
    for (var record : records) {
        if (!workQueue.offer(record)) {              // local bounded buffer full
            consumer.pause(consumer.assignment());   // stop fetching, stay in group
            pausedAt = System.nanoTime();
            break;
        }
    }
    if (!consumer.paused().isEmpty() && workQueue.remainingCapacity() > drainThreshold) {
        consumer.resume(consumer.paused());          // drained enough; fetch again
    }
}

pause() keeps the consumer in the group and keeps sending heartbeats (heartbeats run on a background thread since the client rewrite of 0.10.1), so you avoid the rebalance storm entirely while still applying hard backpressure to the broker.

Make consumer lag your primary saturation signal and act on a target, not a static threshold. Lag is the difference between the partition’s log-end offset and the consumer’s committed offset. A flat or shrinking lag is healthy at any absolute value; a monotonically rising lag means arrival > service and no buffer will save you – you must add consumers (up to the partition count, the hard parallelism ceiling) or shed upstream. Scale on the rate of change of lag, not its instantaneous value.

Step 4 – Rate limiting and admission control at ingress with token buckets

Backpressure protects you against your own slow stages. Rate limiting protects you against the outside world. The token bucket is the right primitive because, unlike a fixed window, it bounds the long-run average rate while still permitting a controlled burst.

The mechanics: a bucket holds up to capacity tokens and refills at refill_rate tokens/second. Each admitted request removes one token; if the bucket is empty, the request is rejected (or queued, briefly). capacity sets the maximum burst; refill_rate sets the sustained ceiling. This is exactly how an envoy/NGINX limiter or a Bucket4j-backed gateway behaves.

# Envoy local rate limit: token bucket, 1000 rps sustained, burst to 1000.
- name: envoy.filters.http.local_ratelimit
  typed_config:
    "@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
    stat_prefix: ingress_rl
    token_bucket:
      max_tokens: 1000           # burst capacity
      tokens_per_fill: 1000
      fill_interval: 1s          # => 1000 tokens/sec sustained refill
    filter_enabled:
      default_value: { numerator: 100, denominator: HUNDRED }
    filter_enforced:
      default_value: { numerator: 100, denominator: HUNDRED }

Two non-negotiables when you reject. First, return 429 Too Many Requests with a Retry-After header so well-behaved clients back off instead of hot-looping (a tight retry on a 429 is a self-inflicted load amplifier). Second, decide between local and distributed limiting. Per-instance local limits are cheap and have no shared-state dependency, but N instances admit N x the rate; a true global ceiling needs a shared counter (Redis with an atomic Lua script, or a sidecar global rate-limit service). Pay the coordination cost only when the downstream you are protecting has a single global capacity – a per-tenant quota or a fixed-throughput database – otherwise local limits are simpler and fail independently.

Step 5 – Load shedding and graceful degradation under sustained overload

When arrival > service persists past what your buffer can absorb, you have exactly two honest options: shed load or fall over. Shedding is the engineering choice. The principle is to reject early and reject cheaply – drop work at ingress, before you have spent CPU, memory, and downstream calls on a request you will abandon anyway. The worst possible design does expensive work and then times out: you paid full price for zero value and starved the requests you could have served.

Prioritize what you keep. Not all load is equal – shed low-value traffic first and protect the critical path:

The cleanest production trigger is queue depth or in-flight count, not CPU – CPU saturation is a lagging signal and noisy under GC. A simple adaptive shedder that watches its own bounded queue:

// Shed when the bounded work queue is past 80% full; keep high-priority work longer.
int depth = workQueue.size();
int cap   = 10_000;
if (depth > cap * 0.80 && request.priority() == Priority.LOW) {
    meter.counter("shed.low_priority").increment();
    return Response.status(503)
                   .header("Retry-After", "2")
                   .build();
}

Even better for variable-latency downstreams is to bound concurrency rather than queue length – a Netflix concurrency-limits-style AIMD controller, or a fixed bulkhead/semaphore – so the limit tracks the downstream’s actual capacity as its latency drifts, instead of guessing a static number. Whatever the trigger, shedding must be observable: a 503 you cannot count is an outage you cannot see. Every shed decision increments a labeled counter, full stop.

Step 6 – Coordinate backpressure across multi-stage pipelines and fan-out

A pipeline is only as backpressure-aware as its weakest link. If stage 2 honors demand but stage 3 swallows it with an unbounded internal queue, stage 3’s queue is where your memory dies – the explicit bounds on stages 1 and 2 bought you nothing. Backpressure must be continuous from sink to source; a single unbounded hop anywhere breaks the whole chain.

The hard cases are the boundaries between paradigms. The async-to-sync bridge is the classic offender: a reactive stream feeding a blocking JDBC call must run that call on a bounded scheduler (e.g. Reactor’s Schedulers.boundedElastic() or a fixed-size pool), so the pool size becomes the credit limit and a slow database parks workers instead of inflating an unbounded queue of pending calls.

Fan-out needs a deliberate rule. When one stream splits to multiple consumers at different speeds, you choose between coupling them – the whole group runs at the slowest consumer’s rate, true end-to-end backpressure but one slow consumer throttles everyone – or decoupling with a per-consumer bounded buffer that drops or sheds independently. Telemetry fan-out usually wants the decoupled, drop-on-full path so a slow analytics sink never stalls the alerting sink. A coupled bus broadcast that blocks on its slowest subscriber will, the day that subscriber GCs, freeze every other subscriber on the bus – a failure that looks like a total outage but is one slow consumer holding the whole broadcast hostage.

Verify

Backpressure is only real if you can prove the producer slows when the consumer stalls. Test the saturation path explicitly – do not wait for production to find the unbounded hop.

1. Inject a slow consumer and watch the producer. Add an artificial delay to the slowest stage and drive load above service rate. Backpressure is working if producer throughput drops to match the consumer; it is broken if producer throughput stays flat while memory climbs – that flat line is your unbounded queue.

# Drive sustained overload and watch effective throughput converge to service rate.
hey -z 60s -c 200 -q 50 https://api.internal/v1/ingest

2. Confirm overflow is bounded, not infinite. Under sustained overload, heap and queue-depth metrics must plateau, not grow linearly. A linear climb means at least one buffer is unbounded.

// Heap must plateau under overload; a rising slope is an unbounded buffer.
performanceCounters
| where name == "% committed bytes in use" or name == "private bytes"
| where timestamp > ago(15m)
| summarize p99 = percentile(value, 99) by bin(timestamp, 30s)
| render timechart

3. Track Kafka lag rate of change. Confirm lag stabilizes (or shrinks) once you pause/resume or scale – a continuously rising slope means you are under-provisioned, not merely bursting.

kafka-consumer-groups.sh --bootstrap-server broker:9092 \
  --describe --group ingest-workers
# Watch the LAG column across partitions; flat/falling = healthy, rising = under-served.

4. Verify shed counters fire and stay proportional. Under overload, shed/429/503 counters must increment, low-priority sheds before high, and the served fraction must stay healthy. Silent shedding is indistinguishable from data loss.

Enterprise scenario

A fintech platform team I worked alongside ran a market-data fan-in: ~40 upstream price feeds pushing into a single Reactor pipeline that normalized ticks and wrote them to an in-memory order book consumed by pricing services. It ran clean for months. Then an exchange had a volatile open, one feed’s tick rate spiked roughly 8x for ninety seconds, and the normalizer – doing a per-tick currency lookup against a service that was itself now slow – fell behind. The pipeline used onBackpressureBuffer() with no bound, the default. Heap went from 4 GB to the 28 GB ceiling in under two minutes, GC pauses crossed ten seconds, and the price service started serving stale books to traders. The post-incident finding was blunt: there was no backpressure, only an unbounded buffer wearing its costume.

The constraint that shaped the fix: for market data, a stale tick is worthless – only the latest price per instrument matters – and the feed was an external push source they could not slow. Blocking was off the table (it would have stalled the shared event loop and frozen all 40 feeds), and lossless buffering was both impossible and pointless. So they replaced the unbounded buffer with a bounded onBackpressureLatest() keyed per instrument: keep only the most recent tick when the consumer is behind, drop the superseded ones, and emit a labeled drop metric. They also moved the currency lookup off the hot path into a periodically refreshed local cache, removing the slow synchronous call entirely.

// Bounded + latest-wins: under overload, keep the newest tick, shed the stale ones.
priceFeed
    .onBackpressureLatest()                       // drop superseded ticks, never the freshest
    .publishOn(Schedulers.boundedElastic(), 256)  // bounded prefetch == credit limit
    .doOnDiscard(Tick.class, t -> dropped.increment())
    .map(this::normalizeWithCachedFx)             // no synchronous lookup on the hot path
    .subscribe(orderBook::apply);

The replay of the same volatility spike against the fixed pipeline held heap flat, kept the freshest price flowing under load, and shed exactly the stale ticks nobody wanted. The lesson the team wrote into their platform guidelines: every onBackpressure* operator must be bounded and emit a drop metric, and any unbounded buffer in a streaming path is a sev-1 waiting for a bad market open.

Checklist

resiliencestreamingevent-drivenperformancereactive

Comments

Keep Reading