Blog
Backend2026-W183 min read

The Stream Wrapper Pattern for Transparent Event Capture

When wrapping an SDK to add observability, usage data lives in the final chunk of a stream. The `finally` block in `__iter__` is the one place you're guaranteed to see it.

A clear glass laboratory pipette suspended over a beaker with a single amber drop forming at its tip.

The problem

When you wrap an SDK client to add observability, streaming responses break the standard approach. With a non-streaming call you get a single response object back and can read .usage off it immediately. With streaming, the usage data lives inside the final chunk of the iterator — it isn't available until the caller has consumed the whole stream. If your wrapper reads usage in the finally block before the caller iterates, you get None for every token count.

The approach

The stream wrapper is a thin proxy class with a single responsibility: intercept iteration and emit an accounting event when the stream exhausts.

class _StreamWrapper: def __init__(self, stream, *, session_id, model, t0): self._stream = stream self._input_tokens = None self._output_tokens = None # ... other context stored here def __iter__(self): try: for event in self._stream: self._capture(event) # extract tokens from passing chunks yield event # caller sees the event unchanged finally: self._emit_final() # fires whether iteration ends or raises def __getattr__(self, name): return getattr(self._stream, name) # proxy everything else

_capture inspects each chunk for usage data — for Anthropic this means watching for RawMessageStartEvent (input tokens) and RawMessageDeltaEvent (output tokens). For OpenAI with stream_options={"include_usage": True}, usage lands on the last chunk.

_emit_final runs in the finally block so it fires whether the caller consumes the whole stream, breaks early, or raises. It has all the counts by then and emits a stream_end event with real numbers.

The caller's code is unchanged — they still for chunk in resp: exactly as before. isinstance(resp._stream, OriginalType) still holds for any code that inspects the type.

What I learned

The finally in __iter__ is the key insight. A try/finally wrapping a generator's yield loop runs the finally when the generator is garbage-collected or when a return/StopIteration is raised — which covers every exit path. You get one guaranteed emission point regardless of how the caller drives the iterator. The alternative — hooking __del__ — is unreliable under CPython and completely unreliable under PyPy. The iterator protocol's finally is the right tool.