Skip to main content

Workflow Streams - TypeScript SDK

Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job.

SUPPORT, STABILITY, and DEPENDENCY INFO

The @temporalio/workflow-streams module is currently in Public Preview. Refer to the Temporal product release stages guide for more information.

The API may change before general availability.

Enable streaming on a Workflow

The library ships as @temporalio/workflow-streams and you can import the Workflow-side interface from @temporalio/workflow-streams/workflow. Enable streaming by constructing a WorkflowStream at the top of your Workflow function, before any await. Construction must happen there because the stream's handlers have to be registered before the first publish Signal arrives. Doing it after an await would miss any publishes that arrived before the run body resumed.

import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export interface OrderInput {
orderId: string;
}

export async function orderWorkflow(input: OrderInput): Promise<void> {
const stream = new WorkflowStream();
// ... rest of the workflow
}

Construct exactly one WorkflowStream at the top of the Workflow function. Constructing WorkflowStream creates the in-memory event log and registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. If you have more than one WorkflowStream on the same Workflow silently replaces the handlers. The TypeScript Workflow runtime doesn't expose an inspection API for existing handlers, so the library can't raise an exception on a duplicate the way the Python SDK does.

If your Workflow uses Continue-As-New, see Continue-As-New below for how to carry stream state across runs so subscribers don't have gaps.

Publish from a Workflow

Bind a topic name to its event type once via stream.topic<T>(name), then call publish() on the returned handle to append events. The handle carries the topic name and the type T so call sites don't have to repeat them on every publish, and so subscribers reading the same handle decode to the matching type. Repeated calls with the same name return the same handle instance.

import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export interface StatusEvent {
state: string;
progress?: number;
detail?: string;
}

export interface OrderInput {
orderId: string;
}

export async function orderWorkflow(input: OrderInput): Promise<void> {
const stream = new WorkflowStream();
const status = stream.topic<StatusEvent>('status');

status.publish({ state: 'validating', detail: 'checking inventory' });
await validateOrder(input.orderId);

status.publish({ state: 'charging', progress: 33, detail: 'authorizing payment' });
await chargePayment(input.orderId);

status.publish({ state: 'shipping', progress: 66, detail: 'dispatching to warehouse' });
await dispatchOrder(input.orderId);

status.publish({ state: 'completed', progress: 100 });
}

publish() runs the default payload converter to encode each value. The codec chain (encryption, compression, etc.) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction.

Unlike the Python SDK, T here is a compile-time annotation only: TypeScript has no runtime type representation, so the library can't enforce per-topic type uniformity at the publish site. If two publishers bind the same topic name to different types, the mismatch is not caught at publish. The subscriber gets a decode error when it processes events from the mismatched publisher. A pre-built Payload may be passed to publish() regardless of the handle's type T, taking the zero-copy fast path.

Publish from a client

Any process that has a Temporal Client and the target Workflow Id can publish to that Workflow's stream by constructing a WorkflowStreamClient. This is the general pattern and covers HTTP backends, starters, one-off scripts, Activities from other Workflows, and standalone Activities.

Construct one with:

WorkflowStreamClient.create(client, workflowId)

Then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let await using flush on scope exit.

When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but doesn't read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see Delivery semantics.

import { Client } from '@temporalio/client';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export async function publishStatus(workflowId: string): Promise<void> {
const temporalClient = new Client();
await using streamClient = WorkflowStreamClient.create(temporalClient, workflowId, {
batchInterval: '200 milliseconds',
});

const status = streamClient.topic<StatusEvent>('status');
status.publish({ state: 'started' });
// ...
// Buffer is flushed automatically on `await using` scope exit.
}

The await using declaration relies on TypeScript 5.2+ and Node 20.11+. On older runtimes, call await streamClient[Symbol.asyncDispose]() explicitly at the end of the publishing scope.

Inside an Activity scheduled by a Workflow, WorkflowStreamClient.fromWithinActivity() is used to infer the Temporal Client and the parent Workflow Id from the Activity context, so you don't have to thread them through the Activity's input:

import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export interface Delta {
text: string;
}

export async function streamDeltas(orderId: string): Promise<void> {
await using client = WorkflowStreamClient.fromWithinActivity();
const deltas = client.topic<Delta>('delta');

for await (const delta of generateDeltas(orderId)) {
deltas.publish(delta);
Context.current().heartbeat();
}
// Buffer is flushed automatically on scope exit.
}

For a standalone Activity (one started directly via Client.activity.start rather than from a Workflow), there is no parent Workflow context to infer, so fromWithinActivity() throws. Fall back to the general pattern with Context.current().client and the target Workflow Id threaded through the Activity's input.

Two operations give the application explicit control over when batches ship: forceFlush: true on a publish for latency, and await client.flush() for confirmation that prior publications have landed.

Pass { forceFlush: true } on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the client is alive (between first publish() and Symbol.asyncDispose). Otherwise, forceFlush: true queues the wake event but nothing ships until a flush or dispose occurs. The call returns immediately after appending to the buffer and signaling the flusher. It doesn't wait for delivery to the Workflow or to subscribers:

deltas.publish(delta, { forceFlush: true });

Use it for latency-sensitive events: the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE. See Tuning for the trade-off against history pressure.

Use await client.flush() when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Exiting await using already flushes on its way out, so the explicit call is only for barriers in the middle:

await using client = WorkflowStreamClient.fromWithinActivity();
const deltas = client.topic<Delta>('delta');

for (const delta of firstPhase()) {
deltas.publish(delta);
}

await client.flush();
const checkpointId = await recordPhaseOneComplete(); // only safe once phase-one events are durable

for (const delta of secondPhase(checkpointId)) {
deltas.publish(delta);
}

publish() is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns. From inside a Workflow, it appends synchronously to the in-memory log (no buffer, nothing to flush). Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber doesn't slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals can't keep up.

If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of publish(). The choice (block, drop, error, sample) is application-specific, and Workflow Streams doesn't pick one for you.

Subscribe

Subscribing uses the same client construction as publishing: WorkflowStreamClient.create(client, workflowId) from any process that has a Temporal Client or fromWithinActivity() inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.

Once you have a client, iterate a topic handle's subscribe(), the counterpart to publish(). The handle's bound type drives decoding, so each item.data arrives as T via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.

import { Client } from '@temporalio/client';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export async function watchOrder(orderId: string): Promise<void> {
const temporalClient = new Client();
const stream = WorkflowStreamClient.create(temporalClient, orderId);

const status = stream.topic<StatusEvent>('status');
for await (const item of status.subscribe()) {
const evt = item.data;
console.log(`[${(evt.progress ?? 0).toString().padStart(3)}%] ${evt.state}: ${evt.detail ?? ''}`);
if (evt.state === 'completed') break;
}
}

The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. Callers don't need to wrap the iterator for the common cases.

Two edge cases are worth knowing:

  • An RPC timeout where Continue-As-New can't be followed ends the iterator silently
  • A validator rejection during a Continue-As-New handoff can surface as a WorkflowUpdateFailedError.

A subscriber that doesn't need flushing can skip await using because the background flusher only runs for publishers.

Heterogeneous topics

A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call client.subscribe() directly with a list of names (or subscribe() with no arguments for every topic). The default overload yields WorkflowStreamItem<Payload>, so each item arrives as the raw Payload carrying encoding metadata. Dispatch on item.topic and decode the payload with defaultPayloadConverter.fromPayload<T>(item.data):

import { defaultPayloadConverter } from '@temporalio/common';

for await (const item of stream.subscribe(['status', 'progress'])) {
if (item.topic === 'status') {
const evt = defaultPayloadConverter.fromPayload<StatusEvent>(item.data);
console.log(`[status] ${evt.state}: ${evt.detail ?? ''}`);
} else if (item.topic === 'progress') {
const evt = defaultPayloadConverter.fromPayload<ProgressEvent>(item.data);
console.log(`[progress] ${evt.message}`);
}
}

A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. The raw Payload is also the right shape when you want to forward the bytes through to another system without decoding them.

Closing the stream

A subscriber's for await doesn't know when the publisher is done. How you close a stream depends on what the application needs. As one example, a common pattern combines two pieces:

  1. An in-band terminator. The Workflow or its Activity publishes a sentinel event the subscriber recognizes and breaks on. In the watchOrder example above, { state: 'completed' } is the minimal form, and the consumer's if (evt.state === 'completed') break is the matching half. Each subscription decides what its own end-of-stream marker is.
  2. A brief overlap before the Workflow returns. A poll Update that is still in flight when the Workflow returns is surfaced to the iterator and consumed silently (the iterator either follows Continue-As-New or exits cleanly), and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it.

There are two ways to provide that overlap.

  • Fixed sleep. Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits:

    import { sleep } from '@temporalio/workflow';

    // at the end of the workflow function
    status.publish({ state: 'completed', progress: 100 });
    await sleep('30 seconds');
    return result;
  • Acknowledgment handshake. The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives:

    import { condition, defineSignal, setHandler } from '@temporalio/workflow';
    import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

    export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator');

    export async function chatWorkflow(input: ChatInput): Promise<string> {
    const stream = new WorkflowStream();
    let subscriberDone = false;
    setHandler(subscriberAcknowledgedTerminator, () => {
    subscriberDone = true;
    });

    // ... do work and publish events ...

    await condition(() => subscriberDone, '30 seconds');
    // Returns true if the ack arrived, false on timeout — either way, fall through.
    return result;
    }

The full pattern is wired into the Stream LLM output example below.

You can inspect the terminal status. subscribe() exits cleanly when the Workflow reaches COMPLETED, FAILED, CANCELLED, TERMINATED, or TIMED_OUT, but doesn't distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call await temporalClient.workflow.getHandle(workflowId).describe() after the loop returns to inspect the Workflow's status.

Continue-As-New

Continue-As-New following requires the client retained from WorkflowStreamClient.create() or fromWithinActivity(). Clients constructed directly from a single WorkflowHandle can't re-target the new run.

To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add an optional streamState?: WorkflowStreamState field to your Workflow input, pass it to the constructor, and call stream.continueAsNew(buildArgs) to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls continueAsNew with the args produced by buildArgs(postDrainState):

import { workflowInfo } from '@temporalio/workflow';
import { WorkflowStream, type WorkflowStreamState } from '@temporalio/workflow-streams/workflow';

export interface WorkflowInput {
itemsProcessed: number;
streamState?: WorkflowStreamState;
}

export async function longRunningWorkflow(input: WorkflowInput): Promise<void> {
const stream = new WorkflowStream(input.streamState);
let itemsProcessed = input.itemsProcessed;

while (true) {
await doOneIteration(stream);
itemsProcessed++;

if (workflowInfo().continueAsNewSuggested) {
await stream.continueAsNew<typeof longRunningWorkflow>((state) => [
{
itemsProcessed,
streamState: state,
},
]);
}
}
}

The optional streamState? on the input field is required: priorState is undefined on a fresh start and a WorkflowStreamState after a rollover. The buildArgs lambda receives the post-detach WorkflowStreamState as its only argument so the snapshot is guaranteed to happen after pollers detach.

To pass other Continue-As-New parameters such as taskQueue, searchAttributes, or workflowRunTimeout, use the explicit recipe with makeContinueAsNewFunc instead:

import { allHandlersFinished, condition, makeContinueAsNewFunc } from '@temporalio/workflow';

stream.detachPollers();
await condition(allHandlersFinished);
const continueWithOptions = makeContinueAsNewFunc<typeof longRunningWorkflow>({
taskQueue: 'other-tq',
});
await continueWithOptions({
itemsProcessed,
streamState: stream.getState(),
});

The carried WorkflowStreamState includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. Offload the bytes via External Storage so each item is a small reference rather than the full payload, and combine that with truncate() to keep the carried log itself small.

Deduplication window

See Delivery Semantics for more details on subscriber and publisher behavior. See Tuning for more details on how to change your settings to meet the requirements for your Workflow Streams.

There are two limits on the deduplication window worth highlighting:

  • publisherTtl: At each Continue-As-New, deduplicate entries whose lastSeen is older than this are dropped. lastSeen is updated on each successful publish (not on each retry attempt), so a publisher that retries through a long partition without success can still age out. A publisher that returns after a longer pause may produce a duplicate. Tune upward if your publishers can be silent for extended windows:

    stream.continueAsNew(buildArgs, { publisherTtl: '...' })
  • maxRetryDuration: A WorkflowStreamClient retries a failed batch for up to this long. If the duration elapses with the batch still pending, the client gives up, the pending batch is dropped, and a FlushTimeoutError is raised.

    On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. Subsequent publishes resume cleanly with the next sequence. One operational caveat: the FlushTimeoutError is raised from inside the background flusher task and terminates it. Until you call await client.flush() or exit the await using scope, subsequent publishes accumulate in the buffer with no flusher to ship them.

Best practices

There are a few details to note if you're writing custom message handlers or testing the library's capabilities:

  • The package has no root entrypoint. You can import from one of two subpaths:
    • @temporalio/workflow-streams/workflow: The Workflow-safe interface (WorkflowStream, WorkflowStreamState, etc). Bundles cleanly into Workflow code.
    • @temporalio/workflow-streams/client: the client interface (WorkflowStreamClient, etc). Pulls in crypto, @temporalio/activity, and @temporalio/client, none of which resolve inside the Workflow sandbox. Don't import from a Workflow file.
  • WorkflowStreamClient is single-event-loop. The client buffer is mutated on the publish path and read from the background flusher inside one Node event loop. Don't call publish() from a Worker thread. Route events back to the loop that owns the client.
  • Constructing two WorkflowStreams silently replaces handlers. The TypeScript Workflow runtime doesn't expose an inspection API for existing handlers, so the library can't raise on a duplicate the way the Python SDK does. Construct exactly one WorkflowStream per Workflow at the top of the function.
  • Type bindings aren't shared across publishers. Each WorkflowStream and each WorkflowStreamClient records topic types only for its own instance, and the type parameter T is erased at compile time, so no runtime check enforces uniformity. If two publishers bind the same topic name to different types, the mismatch is not caught at publish, and the subscriber gets a decode error when it processes events from the mismatched publisher.
  • Custom payload converters. A WorkflowStreamClient created via WorkflowStreamClient.create(client, ...) picks up the client's configured payload converter. Subscribers decode through the same converter. The Workflow-side always uses defaultPayloadConverter. If you ship a custom converter, make sure both sides agree or use types the default converter handles.
  • Cross-realm Uint8Array for binary publishes. Hand-publishing a Uint8Array from Workflow code uses a dedicated code path that constructs a binary/plain Payload directly, because the sandbox's TextEncoder returns a host-realm Uint8Array that fails instanceof checks against the sandbox's own globals. You generally don't need to think about this, but if you bypass the workflow-side handle and construct payloads manually, use the Workflow-side WorkflowStream API rather than building payloads by hand.

Example: Stream LLM output

The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive. The Workflow kicks off the Activity and waits for the consumer to acknowledge end-of-stream. The consumer subscribes, accumulates the deltas, and clears its accumulated state on RETRY before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser. Anything that holds the displayed state calls render() to display it.

If your Activity can retry, the consumer side has to account for it. A retried attempt is a fresh publisher, so its output appears in the stream alongside the output from the previous attempt. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a RETRY event. The example wires up that pattern. See Delivery semantics for the precise guarantees.

// activities.ts
import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import OpenAI from 'openai';

export interface TextDelta {
text: string;
}

export interface RetryEvent {
attempt: number;
}

export async function streamCompletion(prompt: string): Promise<string> {
const attempt = Context.current().info.attempt;
await using streamClient = WorkflowStreamClient.fromWithinActivity({
batchInterval: '200 milliseconds',
});
// Disable provider-side retries; let Temporal own retry policy at the Activity layer.
const openai = new OpenAI({ maxRetries: 0 });

const deltas = streamClient.topic<TextDelta>('delta');
const retry = streamClient.topic<RetryEvent>('retry');
const close = streamClient.topic<Record<string, never>>('close');

// Tell consumers an earlier attempt's deltas are stale.
if (attempt > 1) {
retry.publish({ attempt }, { forceFlush: true });
}

const oaiStream = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: prompt }],
stream: true,
});

const full: string[] = [];
let first = true;
for await (const chunk of oaiStream) {
const text = chunk.choices[0]?.delta?.content;
if (!text) continue;
// forceFlush only on the first delta so the user sees something
// immediately; subsequent deltas batch at the 200 ms interval.
deltas.publish({ text }, first ? { forceFlush: true } : undefined);
first = false;
full.push(text);
}
close.publish({});
return full.join('');
}
// workflows.ts
import { condition, defineSignal, executeActivity, setHandler } from '@temporalio/workflow';
import { WorkflowStream } from '@temporalio/workflow-streams/workflow';
import type * as activities from './activities';

export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator');

export interface ChatInput {
prompt: string;
}

export async function chatWorkflow(input: ChatInput): Promise<string> {
const stream = new WorkflowStream();
let subscriberDone = false;
setHandler(subscriberAcknowledgedTerminator, () => {
subscriberDone = true;
});

const result = await executeActivity<typeof activities.streamCompletion>('streamCompletion', input.prompt, {
startToCloseTimeout: '5 minutes',
});

// Wait for the subscriber to ack the terminal `close` event. The timeout
// is a fallback for when no subscriber is attached; with the ack, the
// typical case exits as soon as the subscriber confirms.
await condition(() => subscriberDone, '30 seconds');
return result;
}
// consumer.ts: accumulates the model's output and resets on retry
import { Client } from '@temporalio/client';
import { defaultPayloadConverter } from '@temporalio/common';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import { subscriberAcknowledgedTerminator } from './workflows';

export async function streamChat(chatId: string): Promise<string> {
const temporalClient = new Client();
// Subscribe-only; no `await using` needed because the flusher only runs for publishers.
const stream = WorkflowStreamClient.create(temporalClient, chatId);
const output: string[] = [];

function render(): void {
// ... display the accumulated output (terminal redraw, UI update, etc.)
}

for await (const item of stream.subscribe(['delta', 'retry', 'close'])) {
if (item.topic === 'retry') {
// Earlier attempt's deltas are stale; drop what we've shown.
output.length = 0;
render();
} else if (item.topic === 'delta') {
const delta = defaultPayloadConverter.fromPayload<TextDelta>(item.data);
output.push(delta.text);
render();
} else if (item.topic === 'close') {
// Acknowledge so the Workflow can return without waiting on the fallback timeout.
await temporalClient.workflow.getHandle(chatId).signal(subscriberAcknowledgedTerminator);
break;
}
}

return output.join('');
}

A few choices in this shape are deliberate:

  • The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value, never reading its own stream — see Publish from a client for why.
  • The Activity publishes a RETRY event when Context.current().info.attempt > 1. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see Delivery semantics).
  • Termination uses an ack handshake: the consumer signals the Workflow once it has received the close event, so the Workflow can return as soon as the subscriber confirms. The condition timeout is the fallback when no subscriber is attached (see Closing the stream for the simpler fixed-sleep alternative).
  • { forceFlush: true } is used only on the first delta and on the RETRY sentinel, where latency matters. Subsequent deltas batch at the 200 ms batchInterval; per-delta forceFlush: true would generate one Signal per token (see Tuning for the trade-off).

See also