Lesson 5 - Guided Project: Streaming Endpoint

Welcome to the Streaming Endpoint Project

Every time you watch a chat assistant type its answer one word at a time, you’re watching a stream: the server doesn’t compute the whole reply, package it up, and ship it as one block — it sends each piece the moment it’s ready, and your screen renders the words as they land. That’s not a UI trick; it’s a server design choice, and FastAPI makes it surprisingly small to build. In this project you’ll write a /generate endpoint that streams a reply token-by-token (here, word-by-word) using StreamingResponse over a generator. Then you’ll make it realistic with a per-token delay using an async generator — the exact shape you’d use to forward tokens from a real language model. The “model” here is simulated locally, so there’s no API key and nothing external to call; the focus is the streaming mechanics you’ll reuse for the real thing.

By the end of this project, you will be able to:

  • Write a generator that yields a reply one token at a time
  • Wrap a generator in a StreamingResponse so chunks flush as they’re produced
  • Use an async generator with await asyncio.sleep when the source is awaitable
  • Explain how a client consumes a stream incrementally — the LLM token-streaming pattern

Let’s build it in stages.


Stage 1: A Generator That Yields One Token at a Time

The heart of streaming is a generator: a function that yields values one at a time instead of building a full list and returning it. Each yield hands back a single piece and pauses until the caller asks for the next one. That “produce a piece, pause, produce the next” rhythm is exactly what streaming needs.

Our generator takes a prompt, builds a reply, splits it into words, and yields each word (with a trailing space so they don’t run together):

def token_stream(prompt: str):
    reply = f"You said: {prompt}. Here is a streamed answer.".split()
    for word in reply:
        yield word + " "

Before wiring it into FastAPI, let’s prove it really emits one token at a time by looping over it directly:

for token in token_stream("hi"):
    print(repr(token))

print("joined:", repr("".join(token_stream("hi"))))
'You '
'said: '
'hi. '
'Here '
'is '
'a '
'streamed '
'answer. '
joined: 'You said: hi. Here is a streamed answer. '

Each word comes out as its own value — eight separate tokens — and joining them reconstructs the full reply. Nothing waits for the whole sentence to exist before the first word is available. That’s the property we want to expose over HTTP.


Stage 2: Wrap It in a StreamingResponse

A normal endpoint returns a value, and FastAPI serializes the whole thing before sending one response. To send pieces as they’re produced, you hand FastAPI a generator wrapped in StreamingResponse. FastAPI iterates the generator and flushes each yielded chunk to the client as it arrives, rather than buffering the lot.

The media_type matters: with text/plain, the client treats the body as a stream of text and can show it incrementally. (If you returned JSON, the client would typically wait for the whole document to parse it — defeating the point.)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

def token_stream(prompt: str):
    reply = f"You said: {prompt}. Here is a streamed answer.".split()
    for word in reply:
        yield word + " "

@app.get("/generate")
def generate(prompt: str):
    return StreamingResponse(token_stream(prompt), media_type="text/plain")

Note that generate is a plain def, and the generator is a plain (synchronous) generator. That’s fine here: the work is trivial and CPU-cheap. The async version in the next stage is what you reach for when each token comes from something you can await.

Let’s verify it with TestClient:

from fastapi.testclient import TestClient

client = TestClient(app)

r = client.get("/generate?prompt=hi")
print("status:", r.status_code)
print("content-type:", r.headers["content-type"])
print("body:", repr(r.text))
status: 200
content-type: text/plain; charset=utf-8
body: 'You said: hi. Here is a streamed answer. '

The body is the full reply and the content type is text/plain; charset=utf-8. Behind that single body string, the server emitted eight separate chunks — TestClient simply concatenates them for you, which is the next subtlety to understand.


Stage 3: Make It Realistic With an Async Generator and a Delay

So far the tokens come out instantly because they already exist in memory. A real model produces tokens over time — there’s a wait between each one. To simulate that honestly, add a small delay per token. This is also where the choice between a sync and an async generator becomes a real decision.

If you delay with time.sleep inside a sync generator, that’s a blocking sleep — it would tie up a worker for the whole reply. The right tool when you’re waiting is await asyncio.sleep, which yields control back to the event loop during the pause so the server can serve other requests in the gap. Since a real token source (an async LLM client, an async database cursor, an async file read) is something you await, an async generator is the natural fit: you await the next token, yield it, and repeat.

Here it is with a 10-millisecond pause per token:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def token_stream_async(prompt: str):
    reply = f"You said: {prompt}. Here is a streamed answer.".split()
    for word in reply:
        await asyncio.sleep(0.01)   # stands in for awaiting the next token
        yield word + " "

@app.get("/generate-async")
async def generate_async(prompt: str):
    return StreamingResponse(token_stream_async(prompt), media_type="text/plain")

The endpoint is now async def and the generator is an async def ... yield (an async generator). StreamingResponse handles both kinds of generator transparently. Verifying:

from fastapi.testclient import TestClient

client = TestClient(app)

r = client.get("/generate-async?prompt=hi")
print("status:", r.status_code)
print("content-type:", r.headers["content-type"])
print("body:", repr(r.text))
status: 200
content-type: text/plain; charset=utf-8
body: 'You said: hi. Here is a streamed answer. '

Same body, same content type — but now each token is separated by a real await, so during every 10ms pause the worker is free to do other work instead of blocking. That’s the version you’d ship when the token source is genuinely awaitable.


Stage 4: How a Real Client Streams It

There’s an important gotcha in the output above: TestClient buffers the response, so it hands you the full body at once no matter how the server chunked it. That’s convenient for assertions, but it hides the streaming. Even when you iterate the chunks deliberately, TestClient collapses our tiny, fast tokens into a single piece:

with client.stream("GET", "/generate?prompt=hi") as resp:
    chunks = [c for c in resp.iter_text()]
print("chunks:", chunks)
chunks: ['You said: hi. Here is a streamed answer. ']

So how do you see the stream? Use a client that renders bytes as they arrive. With curl, the -N (--no-buffer) flag disables output buffering, so each token prints the instant it lands:

curl -N "http://127.0.0.1:8000/generate-async?prompt=hi"

Against the delayed /generate-async endpoint, you’d watch the words appear one at a time, paced by the per-token delay — exactly the “typing” effect you see in a chat UI. A browser’s fetch with a ReadableStream reader does the same thing in the front end. The server’s job is identical in every case: yield a token, flush it, repeat.

This is exactly how LLM token streaming works

Swap our await asyncio.sleep(0.01) for await on a real language model’s streaming response, yield each token as it arrives, and you have a production streaming-chat endpoint — the pattern is unchanged. The Generative AI course streams real Claude tokens through this very shape. Keep in mind the testing caveat you saw here: TestClient (and many HTTP clients by default) buffer the body, so to observe the incremental stream you need an incremental consumer like curl -N or a browser stream reader. The server is streaming the whole time; only the client’s buffering decides whether you watch it arrive piece by piece.


Extend the Project

Exercise 1: Stream Server-Sent Events (SSE)

Browsers have a built-in EventSource API for consuming streams formatted as Server-Sent Events. Change the endpoint to emit SSE: set media_type="text/event-stream" and yield each token as data: <token>\n\n.

Hint

SSE has a strict line format. Each message is data: followed by the payload and two newlines: yield f"data: {word}\n\n". Keep the generator otherwise the same and switch the media_type to text/event-stream. The browser’s EventSource will fire a message event per chunk automatically.

Exercise 2: Add a [DONE] Sentinel

Clients often need to know when a stream is finished versus just paused between tokens. Add a final yield of [DONE] (or data: [DONE]\n\n if you did Exercise 1) so the client can stop listening cleanly.

Hint

After the for loop that yields tokens, add one more statement: yield "[DONE]". The generator finishes naturally after that, but the explicit sentinel gives the client an in-band signal it can check for — the same convention many streaming AI APIs use to mark the end of a response.

Exercise 3: Stream a File Line-by-Line

Streaming isn’t only for generated text. Build a /logs endpoint that streams a large text file one line at a time, so a huge file is never loaded fully into memory.

Hint

A generator can read and yield lazily: def line_stream(): then with open("server.log") as f: and for line in f: yield line. Iterating a file object yields one line at a time, so memory stays flat regardless of file size. Wrap it in StreamingResponse(line_stream(), media_type="text/plain"). For truly async file I/O you’d reach for a library like aiofiles and an async generator.


Summary

You built a streaming endpoint from the ground up. A generator yields a reply one token at a time; wrapping it in StreamingResponse with media_type="text/plain" tells FastAPI to flush each chunk to the client as it’s produced instead of buffering the whole body. To simulate real, time-based generation you switched to an async generator with await asyncio.sleep, the right shape whenever each token comes from something awaitable — because the await frees the worker during each pause. Finally you saw that TestClient buffers the body for convenient assertions, while an incremental client like curl -N reveals the tokens arriving one by one — the “typing” effect behind streaming chat.

Key Concepts

  • Generator — a function that yields values one at a time and pauses between them.
  • StreamingResponse — wraps a (sync or async) generator and flushes each chunk as it’s produced.
  • media_typetext/plain (or text/event-stream for SSE) so the client treats the body as an incremental stream.
  • Async generatorasync def ... yield with await, the right fit when each token is awaitable.
  • Client bufferingTestClient collects the full body; curl -N or a browser stream reader renders chunks as they arrive.

Why This Matters

Token-by-token streaming is the dominant UX for AI features, and it’s increasingly expected anywhere a response is large or slow: live logs, progress feeds, big exports. The same three pieces — a generator, a StreamingResponse, and an awaitable source — power all of them. Get this pattern in your fingers and streaming a real model’s output is a near drop-in change: replace the simulated delay with an await on the model’s stream, yield each token, and you have a production streaming endpoint.


Next Steps

Continue to Module 8 - Testing, Settings, and Deployment

Test your endpoints, configure the app with settings, and deploy it to production — then bring it all together in the capstone.

Back to Module Overview

Return to the Async, Background Work, and Streaming module overview


Continue Building Your Skills

You’ve now built the core async patterns of a modern API: concurrency, background work, and streaming. With a streaming endpoint in your toolkit, you’re ready for the final stretch — making the whole application trustworthy and shippable. Next you’ll test your endpoints, manage configuration cleanly, and deploy, then pull everything together in the capstone project.