Skip to content

Streaming

agentabi supports real-time event streaming from any agent. The Session.stream() method yields IR events as they are produced.

Basic Streaming

import asyncio
from agentabi import Session

async def main():
    session = Session(agent="claude_code")

    async for event in session.stream(prompt="Explain asyncio briefly"):
        etype = event["type"]

        if etype == "session_start":
            print(f"Session: {event.get('session_id')}")

        elif etype == "message_delta":
            print(event["text"], end="", flush=True)

        elif etype == "message_end":
            print()  # newline after message

        elif etype == "tool_use":
            print(f"\n[Tool] {event['tool_name']}({event['tool_input']})")

        elif etype == "tool_result":
            status = "ERR" if event.get("is_error") else "OK"
            print(f"  [{status}] {event['content'][:100]}")

        elif etype == "usage":
            u = event["usage"]
            print(f"\nTokens: {u.get('input_tokens', 0)} in / {u.get('output_tokens', 0)} out")

        elif etype == "error":
            print(f"\nERROR: {event['error']}")

        elif etype == "session_end":
            print("--- session end ---")

asyncio.run(main())

Event Types

All agents produce a common set of IR events:

Event Type Description Key Fields
session_start Session initialized session_id, agent, model
message_start Assistant turn begins role
message_delta Streamed text chunk text
message_end Assistant turn ends text (optional full text), stop_reason
tool_use Tool invocation tool_use_id, tool_name, tool_input
tool_result Tool output tool_use_id, content, is_error
usage Token usage stats usage (dict), cost_usd
error Error occurred error, is_fatal
file_diff File modification file_path, action
session_end Session completed session_id

Event Flow

A typical event sequence looks like:

session_start
  message_start (role=assistant)
    message_delta (text chunk)
    message_delta (text chunk)
    ...
  message_end
  usage
session_end

With tool use:

session_start
  message_start
    tool_use (name=read_file, input={path: "main.py"})
    tool_result (content="file contents...")
    message_delta (text)
  message_end
  usage
session_end

Collecting Full Text

To accumulate the complete response text from a stream:

text_parts = []
async for event in session.stream(prompt="..."):
    if event["type"] == "message_delta":
        text_parts.append(event["text"])
    elif event["type"] == "message_end":
        end_text = event.get("text", "")
        if end_text:
            text_parts.append(end_text)

full_text = "".join(text_parts)

Full Example

See examples/streaming.py for a complete streaming example.