Streaming Fetch¶
If, when reading the server sent events page, you wondered if there was a more general way of streaming data from a server, you were right.
The source code can be found here for the html, and here for the python, (and here here with typing).
The request handler¶
The request handler looks very similar to that of server sent events, except there is no need to implement the protocol; just streaming data is fine.
async def test_events(request):
body = await text_reader(request.body)
data = json.loads(body)
async def send_events():
is_cancelled = False
while not is_cancelled:
try:
print('Sending event')
message = {
'type': 'tick',
'data': {
'time': datetime.now().isoformat(),
'message': data['message']
}
}
line = json.dumps(message) + '\n'
yield line.encode('utf-8')
await asyncio.sleep(1)
except asyncio.CancelledError:
print('Cancelled')
is_cancelled = True
headers = [
(b'cache-control', b'no-cache'),
(b'content-type', b'application/json'),
(b'connection', b'keep-alive')
]
return HttpResponse(200, headers, send_events())
Rather than yielding the event source protocol we simply stream JSON, with each JSON message terminated by a newline.
Note that, unlike the event stream, we can receive POST requests which may have a body. This allows passing more complex requests than would be feasible with the event source.
The client¶
The client code is significantly more complicated.
The core of the client side code is the streaming fetch request:
function streamingFetch(url, message) {
eventTarget = new FetchEventTarget(url, {
method: "POST",
headers: new Headers({
accept: "application/json",
"content-type": "application/json",
}),
mode: "same-origin",
body: JSON.stringify(message),
});
eventTarget.addEventListener("tick", (event) => {
const data = JSON.stringify(event.data);
console.log(data);
});
}
This looks line a standard fetch, but the object created acts like an event target.
The code for FetchEventTarget
looks as follows:
function FetchEventTarget(input, init) {
const eventTarget = new EventTarget();
const textDecoder = new TextDecoder("utf-8");
const jsonDecoder = makeJsonDecoder(input);
const eventStream = makeWriteableEventStream(eventTarget);
fetch(input, init)
.then((response) => {
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(jsonDecoder)
.pipeTo(eventStream);
})
.catch((error) => {
eventTarget.dispatchEvent(new CustomEvent("error", { detail: error }));
});
return eventTarget;
}
Now we can see the actual fetch
call being used, however rather than calling
response.text
we use the body
. The body contains a ReadableStream
, and the
pipe...
methods allow processing of the stream.
The TextDecoder
is a built in class for transforming the stream for bytes to
text.
The JSON decoder looks like this:
function makeJsonDecoder() {
return new TransformStream({
start(controller) {
controller.buf = "";
controller.pos = 0;
},
transform(chunk, controller) {
controller.buf += chunk;
while (controller.pos < controller.buf.length) {
if (controller.buf[controller.pos] == "\n") {
const line = controller.buf.substring(0, controller.pos);
controller.enqueue(JSON.parse(line));
controller.buf = controller.buf.substring(controller.pos + 1);
controller.pos = 0;
} else {
++controller.pos;
}
}
},
});
}
It creates the built in class TransformStream
and splits the text supplied by
the TextDecoder
into lines, then parses them as JSON.
The last part of the chain is the writable event stream.
function makeWriteableEventStream(eventTarget) {
return new WritableStream({
start(controller) {
eventTarget.dispatchEvent(new Event("start"));
},
write(message, controller) {
eventTarget.dispatchEvent(
new MessageEvent(message.type, { data: message.data })
);
},
close(controller) {
eventTarget.dispatchEvent(new CloseEvent("close"));
},
abort(reason) {
eventTarget.dispatchEvent(new CloseEvent("abort", { reason }));
},
});
}
This is the end of the chain and it uses the built in WriteableStream
to turn
the incoming JSON into events.
What next?¶
Either go back to the table of contents or go to the compression tutorial.