Skip to content
当前有符合你浏览器所设置语言的版本,是否前往zh-CN版本的网站?
There is a version suitable for your browser's language settings. Would you like to go to the zh-CN language of the site?
HomeDocument

Stream

H1

Milkio supports providing responses in the form of streams. Data is continuously returned until either the server or the client chooses to stop.

One of the most common use cases is for the generation of Long-Lived Messages (LLM). Generating all replies at once would make users wait for a long time, but using streams to send data to users incrementally can help them avoid the unpleasantness of waiting.

In addition to this, streams are often used to implement the functionality of servers actively sending messages to clients. For example, in a chat feature, we can call an API when the page is created, which is used to push messages received by the user. Whenever another person sends a message to the user, that message is actively pushed to the client through the stream, allowing us to achieve this functionality without introducing WebSockets.

Usage

By changing the API’s action from a regular asynchronous function to an asynchronous generator function, we can provide responses in the form of streams. The difference between an asynchronous generator function and a regular asynchronous function is that the former has a * before the name and uses yield instead of return.

When we want to respond to the client with data once, we simply use the yield keyword, and we can use it multiple times. The following example attempts to push the current time to the client every second.

/src/app/your-stream-api
export const api = defineApi({
meta: {},
async *action(params: { name: string }, context) {
while (true) {
yield `Hello, ${params.name}! The current time is:`;
yield new Date().toLocaleString();
await Bun.sleep(1000);
}
},
});

Milkio Client

The Milkio Client can be called using the executeStream method, which includes a stream object in the result. This object is an AsyncGenerator that can be processed through iteration.

const { stream } = client.executeStream("your-stream-api", { params: { name: 'furina' } });
for await (const chunk of stream) {
console.log(chunk);
}

Automatic Retries

When subscribing to messages, you may want the stream to remain open indefinitely. However, due to network instability, the stream may be closed. You can automatically reconnect when the stream stops.

Some serverless providers, CDNs, or gateways may also limit the maximum runtime of our API, which would force our connection to be terminated when this limit is reached.

You can simply add a loop around the execution to retry continuously.

while (true) {
const { getError, stream } = client.executeStream("your-stream-api", { params: { name: 'furina' } });
for await (const chunk of stream) {
console.log(chunk);
}
// Add a 3-second delay before retrying
await new Promise((resolve) => setTimeout(resolve, 3000));
}

Unlike EventSource, you have control over the details of retrying. This means you can implement automatic stream stopping when a user is inactive for a long time or when the webpage is in the background to save server resources.

Result

You can still obtain results like a regular API. We can determine whether the stream ended due to an error encountered during runtime by checking the result.

const { getResult, stream } = client.executeStream("foo", { params: { name: 'furina' } });
for await (const chunk of stream) {
console.log(chunk);
}
const result = getResult();
if (result.success) console.log("The stream ended normally");
else console.log(`The stream ended due to an error:`, result.fail);

Please call the getResult method after the stream ends to get the result. Otherwise, you might first get a successful result and then a failed result because the stream was actually successful before the failure.

Stopping

You can stop the stream at any time by calling the return method. Stopping the stream doesn’t mean ignoring subsequent data and breaking out of the loop, but actually halts the stream, thereby stopping the execution of the code on the server side.

await stream.return(undefined);

Alternatively, using a break statement to break out of the loop will also stop the stream automatically.

Server-Sent Events

Milkio’s streams are compatible with the Server-Sent Events response format.

However, you should use Milkio’s client instead of the native EventSource object in the browser. By doing so, you gain the following benefits:

  • You can pass parameters and headers to Milkio, whereas EventSource can only send GET requests, meaning you cannot carry a body or change headers.

  • You have control over retry logic and intervals, whereas EventSource always retries with a fixed interval built into the browser (3000 milliseconds).

  • You can use it outside the browser, such as in another TypeScript server.

If you are forced to communicate with a Milkio application in a way other than using the Milkio Client, you can use any library that supports the Server-Sent Events specification to communicate with Milkio’s Stream API. However, there will be some subtle differences between Milkio and the complete Server-Sent Events specification:

  • There is no Event ID.

  • The server cannot control the client’s retry interval.

  • The request header must include Accept: text/event-stream.

  • If you need to pass parameters, send the request using the POST method and include the content in the request body in JSON (or TSON) format.