Skip to main content

Workflow Streams - Python SDK

Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job.

SUPPORT, STABILITY, and DEPENDENCY INFO

The temporalio.contrib.workflow_streams module is currently in Public Preview. Refer to the Temporal product release stages guide for more information.

The API may change before general availability.

This page covers:

Enable streaming on a Workflow

You can import Workflow Streams from temporalio.contrib.workflow_streams. Enable streaming by constructing a WorkflowStream from your Workflow's @workflow.init method because the stream's handlers have to be registered before the first publish Signal arrives. Doing it from @workflow.run raises a RuntimeError and would miss any publishes that arrived before the run body started executing.

from dataclasses import dataclass

from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream


@dataclass
class OrderInput:
order_id: str


@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream()

Constructing WorkflowStream creates the in-memory event log and dynamically registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. Constructing more than one stream on the same Workflow also raises a RuntimeError.

If your Workflow uses Continue-As-New, see Continue-As-New below for how to carry stream state across runs so subscribers see no gap.

Publish from a Workflow

You can publish when you bind a topic name to its event type once via self.stream.topic("name", type=Type), then call publish() on the returned handle to append events. The handle records the per-stream binding from topic name to value type so call sites don't have to repeat the type on every publish and so subscribers reading the same handle decode to the matching type.

from dataclasses import dataclass


@dataclass
class StatusEvent:
state: str
progress: int = 0
detail: str = ""


@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream()
self.status = self.stream.topic("status", type=StatusEvent)

@workflow.run
async def run(self, input: OrderInput) -> None:
self.status.publish(StatusEvent(state="validating", detail="checking inventory"))
await validate_order(input.order_id)

self.status.publish(StatusEvent(state="charging", progress=33, detail="authorizing payment"))
await charge_payment(input.order_id)

self.status.publish(StatusEvent(state="shipping", progress=66, detail="dispatching to warehouse"))
await dispatch_order(input.order_id)

self.status.publish(StatusEvent(state="completed", progress=100))

publish() runs the payload converter to encode each value. The codec chain (encryption, compression, etc.) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction.

The type= argument is optional and defaults to Any. Pass it when you want the binding recorded so re-binding the same name to an unequal type raises and exception or so subscribers can pick up the type from the same handle.

Publish from a client

Any process that has a Temporal Client and the target Workflow Id can publish to that Workflow's stream by constructing a WorkflowStreamClient. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities.

Construct one with:

WorkflowStreamClient.create(client, workflow_id)

Then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let the async context manager flush on exit.

When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but doesn't read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see Delivery semantics.

from datetime import timedelta

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient


async def publish_status(workflow_id: str) -> None:
temporal_client = await Client.connect("localhost:7233")
stream_client = WorkflowStreamClient.create(
temporal_client,
workflow_id=workflow_id,
batch_interval=timedelta(milliseconds=200),
)
async with stream_client:
status = stream_client.topic("status", type=StatusEvent)
status.publish(StatusEvent(state="started"))
...
# Buffer is flushed on context manager exit.

Inside an Activity scheduled by a Workflow, WorkflowStreamClient.from_within_activity() is used to infer the Temporal Client and the parent Workflow Id from the Activity context, so you don't have to thread them through the Activity's input:

from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient


@activity.defn
async def stream_deltas(order_id: str) -> None:
client = WorkflowStreamClient.from_within_activity()
async with client:
deltas = client.topic("delta", type=Delta)
for delta in generate_deltas(order_id):
deltas.publish(delta)
activity.heartbeat()
# Buffer is flushed on context manager exit.

For a standalone Activity (one started directly via Client.start_activity rather than from a Workflow), there is no parent Workflow context to infer, so from_within_activity() raises an exception. Fall back to the general pattern with activity.client() and the target Workflow Id threaded through the Activity's input.

Two operations give the application explicit control over when batches ship: force_flush=True on a publish for latency, and await client.flush() for confirmation that prior publications have landed.

Pass force_flush=True on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the Workflow Stream client is entered (async with client). Otherwise, force_flush=True queues the wake event, but nothing ships until you enter the context or call await client.flush(). The call returns immediately after appending to the buffer and signaling the flusher. It doesn't wait for delivery to the Workflow or to subscribers:

deltas.publish(delta, force_flush=True)

Use it for latency-sensitive events like, the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE. See Tuning for the trade-off against history pressure.

Use await client.flush() when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Exiting async with client already flushes on its way out, so the explicit call is only for barriers in the middle:

async with client:
deltas = client.topic("delta", type=Delta)
for delta in first_phase():
deltas.publish(delta)

await client.flush()
checkpoint_id = await record_phase_one_complete() # only safe once phase-one events are durable in the Workflow log

for delta in second_phase(checkpoint_id):
deltas.publish(delta)

publish() is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns. From inside a Workflow, it appends synchronously to the in-memory log. Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber doesn't slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals can't keep up.

If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of publish(). The choice (block, drop, error, sample) is application-specific, and Workflow Streams doesn't pick one for you.

Subscribe

Subscribing uses the same client construction as publishing: WorkflowStreamClient.create(client, workflow_id) from any process that has a Temporal Client or from_within_activity() inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.

Once you have a client, iterate a topic handle's subscribe(), the counterpart to publish(). The handle's bound type drives decoding, so each item.data arrives as T via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.

from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient


async def watch_order(order_id: str) -> None:
temporal_client = await Client.connect("localhost:7233")
stream = WorkflowStreamClient.create(temporal_client, workflow_id=order_id)

status = stream.topic("status", type=StatusEvent)
async for item in status.subscribe():
evt = item.data
print(f"[{evt.progress:3d}%] {evt.state}: {evt.detail}")
if evt.state == "completed":
break

The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. Callers don't need to wrap the iterator for the common cases.

Two edge cases are worth knowing:

  • An RPC timeout where Continue-As-New cannot be followed ends the iterator silently (no exception raised)
  • A validator rejection during a Continue-As-New handoff can surface as a WorkflowUpdateFailedError. See the API reference for details.

Heterogeneous topics

A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call client.subscribe() directly with a list of names (or subscribe([]) for every topic) and pass result_type=temporalio.common.RawValue so each item arrives as the underlying Payload wrapped in a RawValue. Dispatch on item.topic and decode the wrapped payload with the client's payload converter:

from temporalio.common import RawValue

converter = temporal_client.data_converter.payload_converter

async for item in stream.subscribe(["status", "progress"], result_type=RawValue):
if item.topic == "status":
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.state}: {evt.detail}")
elif item.topic == "progress":
evt = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {evt.message}")

A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. RawValue is also the right shape when you want to forward the bytes through to another system without decoding them.

Omitting result_type entirely or passing result_type=None decodes each item with the converter's default rules. For the stock JSON converter, that means a Python primitive, dict, or list. This works for fully homogeneous streams, but not for the dispatch-by-topic pattern above, where each topic has its own concrete dataclass.

Closing the stream

A subscriber's async for does not know when the publisher is done. How you close a stream depends on what the application needs. As one example, a common pattern combines two pieces:

  1. An in-band terminator. The Workflow or its Activity publishes a sentinel event the subscriber recognizes and breaks on. In the watch_order example above, StatusEvent(state="completed") is the minimal form, and the consumer's if evt.state == "completed": break is the matching half. Each subscription decides what its own end-of-stream marker is.
  2. A brief overlap before the Workflow returns. A poll Update that is still in flight when the Workflow returns surfaces to the client as AcceptedUpdateCompletedWorkflow, and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it.

There are two ways to provide that overlap.

  • Fixed sleep. Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits:

    # at the end of @workflow.run
    self.status.publish(StatusEvent(state="completed", progress=100))
    await workflow.sleep(timedelta(seconds=30))
    return result
  • Acknowledgment handshake. The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives:

    @workflow.signal
    async def subscriber_acknowledged_terminator(self) -> None:
    self.subscriber_done = True

    @workflow.run
    async def run(self, input: ChatInput) -> str:
    ...
    try:
    await workflow.wait_condition(
    lambda: self.subscriber_done,
    timeout=timedelta(seconds=30),
    )
    except TimeoutError:
    pass # No subscriber attached; the run still completes cleanly.
    return result

The full pattern is wired into the Stream LLM output example below.

You can inspect the terminal status. subscribe() exits cleanly when the Workflow reaches COMPLETED, FAILED, CANCELED, TERMINATED, or TIMED_OUT, but does not distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call await temporal_client.get_workflow_handle(workflow_id).describe() after the loop returns to inspect the Workflow's status.

Continue-As-New

Continue-As-New following requires the client retained from WorkflowStreamClient.create() or from_within_activity(). Clients constructed directly with a single handle cannot re-target the new run.

To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add a WorkflowStreamState | None field to your Workflow input, pass it to the constructor, and call WorkflowStream.continue_as_new(build_args) to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls workflow.continue_as_new with the args produced by build_args(post_drain_state):

from dataclasses import dataclass, field

from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState


@dataclass
class AppState:
items_processed: int = 0


@dataclass
class WorkflowInput:
app_state: AppState = field(default_factory=AppState)
stream_state: WorkflowStreamState | None = None


@workflow.defn
class LongRunningWorkflow:
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.app_state = input.app_state
self.stream = WorkflowStream(prior_state=input.stream_state)

@workflow.run
async def run(self, input: WorkflowInput) -> None:
while True:
await do_one_iteration(self)
if workflow.info().is_continue_as_new_suggested():
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state,
)
]
)

The | None on the stream_state field type is required: prior_state is None on a fresh start and a WorkflowStreamState instance after a rollover.

Always use the concrete type, not Any. With Any, the data converter rebuilds the field as a plain dict and WorkflowStream(prior_state=...) raises an AttributeError accessing .log / .base_offset / .publishers on the dict.

To pass other Continue-As-New parameters such as task_queue, retry_policy, run_timeout, use the explicit recipe instead:

self.stream.detach_pollers()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(
args=[WorkflowInput(app_state=self.app_state, stream_state=self.stream.get_state())],
task_queue="other-tq",
)

The carried WorkflowStreamState includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. Offload the bytes via External Storage so each item is a small reference rather than the full payload, and combine that with truncate() to keep the carried log itself small.

Deduplication window

See Delivery Semantics for more details on subscriber and publisher behavior. See Tuning for more details on how to change your settings to meet the requirements for your Workflow Streams.

There are two limits on the deduplication window worth highlighting:

  • publisher_ttl: At each Continue-As-New, deduplicate entries whose last_seen is older than this are dropped. last_seen is updated on each successful publish, so a publisher that retries through a long partition without success can still time out. Tune upward if your publishers can be silent for extended windows:

    WorkflowStream.continue_as_new(publisher_ttl=...)
  • max_retry_duration: A WorkflowStreamClient retries a failed batch for up to this long. If the duration elapses with the batch still pending, the client gives up, the pending batch is dropped, and a TimeoutError is raised.

    On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. One operational caveat: the TimeoutError raises from inside the background flusher task and terminates it. Until you call await client.flush() or exit the async with block, subsequent publishes accumulate in the buffer with no flusher to ship them.

Best practices

There are a few details to note if you're writing custom message handlers or testing the library's capabilities:

  • WorkflowStreamClient is asyncio-only. The client buffer is mutated on the publish path and read from the flusher inside a single event loop. Don't call publish() from a Worker thread.

  • Custom handlers read stream state on the first activation. WorkflowStream registers its publish-Signal handler dynamically from __init__, so on the first activation a publish Signal can be queued before class-level @workflow.signal or @workflow.update handlers have run.

    A handler that observes state set by stream initialization in that same activation can see pre-publish state. The fix is to make the handler async def and await once before reading state. asyncio.sleep(0) is a no-op yield that suffices and adds no history events. Don't substitute workflow.sleep(0), which records a timer event. Once the first activation completes, the handler is permanent and the race doesn't recur.

  • Type bindings aren't shared across publishers. Each WorkflowStream and each WorkflowStreamClient records topic types only for its own instance. If two publishers bind the same topic name to different types, the mismatch is not caught at publish, and the subscriber gets a decode error when it processes events from the mismatched publisher.

Example: Stream LLM output

The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive. The Workflow starts the Activity and waits for the consumer to acknowledge end-of-stream. The consumer subscribes, accumulates the deltas, and clears its accumulated state on RETRY before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser. Anything that holds the displayed state calls render() to display it.

If your Activity can retry, the consumer side has to account for it. A retried attempt is a fresh publisher, so its output appears in the stream alongside the output from the previous attempt. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a RETRY event. The example wires up that pattern. See Delivery semantics for the precise guarantees.

# activity.py
from openai import AsyncOpenAI


@dataclass
class TextDelta:
text: str


@activity.defn
async def stream_completion(prompt: str) -> str:
stream_client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200),
)
# Disable provider-side retries; let Temporal own retry policy at the Activity layer.
openai_client = AsyncOpenAI(max_retries=0)

async with stream_client:
deltas = stream_client.topic("delta", type=TextDelta)
retry = stream_client.topic("retry", type=dict)
close = stream_client.topic("close")

# Tell consumers an earlier attempt's deltas are stale.
if activity.info().attempt > 1:
retry.publish({"attempt": activity.info().attempt}, force_flush=True)

full: list[str] = []
first = True
oai_stream = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in oai_stream:
if not chunk.choices:
continue
text = chunk.choices[0].delta.content
if not text:
continue
# force_flush only on the first delta so the user sees something
# immediately; subsequent deltas batch at the 200 ms interval.
deltas.publish(TextDelta(text=text), force_flush=first)
first = False
full.append(text)
close.publish({})
return "".join(full)
# workflow.py
@workflow.defn
class ChatWorkflow:
@workflow.init
def __init__(self, input: ChatInput) -> None:
self.stream = WorkflowStream()
self.subscriber_done: bool = False

@workflow.signal
async def subscriber_acknowledged_terminator(self) -> None:
self.subscriber_done = True

@workflow.run
async def run(self, input: ChatInput) -> str:
result = await workflow.execute_activity(
stream_completion,
input.prompt,
start_to_close_timeout=timedelta(minutes=5),
)
# Wait for the subscriber to ack the terminal `close` event.
# The timeout is a fallback for when no subscriber is attached;
# with the ack, the typical case exits as soon as the subscriber confirms.
try:
await workflow.wait_condition(
lambda: self.subscriber_done,
timeout=timedelta(seconds=30),
)
except TimeoutError:
pass # No subscriber; the run still completes cleanly.
return result
# consumer.py: accumulates the model's output and resets on retry
async def stream_chat(chat_id: str) -> str:
# Subscribe-only; no `async with` needed because the flusher only runs for publishers.
stream = WorkflowStreamClient.create(temporal_client, workflow_id=chat_id)
converter = temporal_client.data_converter.payload_converter
output: list[str] = []

def render() -> None:
... # display the accumulated output (terminal redraw, UI update, etc.)

async for item in stream.subscribe(
["delta", "retry", "close"], result_type=RawValue
):
if item.topic == "retry":
# Earlier attempt's deltas are stale; drop what we've shown.
output.clear()
render()
elif item.topic == "delta":
delta = converter.from_payload(item.data.payload, TextDelta)
output.append(delta.text)
render()
elif item.topic == "close":
# Acknowledge so the Workflow can return without a sleep.
await temporal_client.get_workflow_handle(chat_id).signal(
ChatWorkflow.subscriber_acknowledged_terminator
)
break

return "".join(output)

A few choices in this shape are deliberate:

  • The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes the Activity's return value, never reading its own stream — see Publish from a client for why.
  • The Activity publishes a RETRY event when activity.info().attempt > 1. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see Delivery semantics).
  • Termination uses an ack handshake: the consumer signals the Workflow once it has received the close event, so the Workflow can return as soon as the subscriber confirms. The wait_condition timeout is the fallback when no subscriber is attached (see Closing the stream for the simpler fixed-sleep alternative).
  • force_flush=True is used on the first delta and on the RETRY sentinel, where latency matters. Subsequent deltas batch at the 200 ms batch_interval; per-delta force_flush=True would generate one Signal per token (see Tuning for the trade-off).

See also