Example: research report generation
An orchestrator agent breaks a research topic into sub-tasks, dispatches them to worker agents, and combines the results into a report.Vercel AI
OpenAI Agents
Google ADK
Pydantic AI
LangChain
Restate TS
Restate Py
workflow-orchestrator.ts
export const researchWorker = restate.service({
name: "ResearchWorker",
handlers: {
research: async (ctx: restate.Context, {question}: { question: string }) => {
const model = wrapLanguageModel({
model: openai("gpt-5.4"),
middleware: durableCalls(ctx, { maxRetryAttempts: 3 }),
});
const { text: answer } = await generateText({
model,
system:
"You are a research assistant. Provide a concise, factual answer.",
prompt: question,
});
return { question, answer };
},
},
});
const orchestrator = restate.service({
name: "ResearchReport",
handlers: {
generate: restate.createServiceHandler(
{ input: schema(ResearchRequestSchema) },
async (ctx: restate.Context, {topic}: { topic: string }) => {
const model = wrapLanguageModel({
model: openai("gpt-5.4"),
middleware: durableCalls(ctx, { maxRetryAttempts: 3 }),
});
// Step 1: Orchestrator creates a research plan
const { output: tasks } = await generateText({
model,
system: `You are a research planner. Break the topic into 2-4 research
sub-tasks. Respond with a JSON array of strings, each a specific
research question. Example: ["question 1", "question 2"]`,
prompt: topic,
output: Output.array({element: z.string()})
});
// Step 2: Dispatch workers in parallel
const workerResults = await RestatePromise.all(
tasks.map((question) =>
ctx.serviceClient(researchWorker).research({ question }),
),
);
// Step 3: Combine results into a report
const { text: report } = await generateText({
model,
system:
"You are a report writer. Combine the research findings into a cohesive report.",
prompt: `Topic: ${topic}\n\nResearch findings:\n${JSON.stringify(workerResults)}`,
});
return { report, taskCount: tasks.length };
},
),
},
});
Run this example
Run this example
Install Restate and launch it:Get the example:Export your OpenAI API key and run the agent:Register the agents with Restate:Send a request to the agent:
npm install --global @restatedev/restate-server@latest @restatedev/restate@latest
restate-server
restate example typescript-vercel-ai-tour-of-agents && cd typescript-vercel-ai-tour-of-agents
npm install
export OPENAI_API_KEY=sk-...
npx tsx ./src/workflow-orchestrator.ts
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{
"topic": "Benefits of durable execution in distributed systems"
}'
workflow_orchestrator.py
planner = Agent(
name="ResearchPlanner",
instructions="You are a research planner. Break the topic into 2-4 research sub-tasks.",
output_type=TaskList,
)
researcher = Agent(
name="Researcher",
instructions="You are a research assistant. Provide a concise, factual answer.",
)
writer = Agent(
name="ReportWriter",
instructions="You are a report writer. Combine the research findings into a cohesive report.",
)
report_service = restate.Service("ResearchReport")
@report_service.handler()
async def generate(ctx: restate.Context, req: ReportRequest) -> dict:
# Step 1: Orchestrator creates a research plan
plan_result = await DurableRunner.run(planner, req.topic)
tasks = plan_result.final_output.tasks
# Step 2: Dispatch workers in parallel
worker_promises = []
for task in tasks:
promise = ctx.service_call(run_researcher, task)
worker_promises.append(promise)
await restate.gather(*worker_promises)
findings = [await p for p in worker_promises]
# Step 3: Combine results into a report
report_result = await DurableRunner.run(
writer,
f"Topic: {req.topic}\n\nResearch findings:\n{json.dumps(findings, indent=2)}",
)
return {"report": report_result.final_output, "task_count": len(tasks)}
researcher_service = restate.Service("Researcher")
@researcher_service.handler()
async def run_researcher(ctx: restate.Context, task: ResearchTask) -> str:
result = await DurableRunner.run(researcher, task.question)
return result.final_output
Run this example
Run this example
Install Restate and launch it:Get the example:Export your OpenAI API key and run the agent:Register the agents with Restate:Send a request:
restate-server
restate example python-openai-agents-tour-of-agents && cd python-openai-agents-tour-of-agents
export OPENAI_API_KEY=sk-...
uv run app/workflow_orchestrator.py
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{"topic": "The impact of renewable energy on global economies"}'
workflow_orchestrator.py
report_service = restate.VirtualObject("ResearchReport")
@report_service.handler()
async def generate(ctx: restate.ObjectContext, req: ReportRequest) -> dict:
session_id = str(ctx.uuid())
# Step 1: Orchestrator creates a research plan
plan_events = plan_runner.run_async(
user_id=ctx.key(),
session_id=session_id,
new_message=Content(role="user", parts=[Part.from_text(text=req.topic)]),
)
plan_output = await parse_agent_response(plan_events)
tasks = TaskList.model_validate_json(plan_output).tasks
# Step 2: Dispatch workers in parallel
worker_promises = []
for task in tasks:
promise = ctx.object_call(run_researcher, key=str(ctx.uuid()), arg=task)
worker_promises.append(promise)
await restate.gather(*worker_promises)
findings = [await p for p in worker_promises]
# Step 3: Combine results into a report
results = f"Topic: {req.topic}\n\nResearch findings:\n{json.dumps(findings)}"
events = writer_runner.run_async(
user_id=ctx.key(),
session_id=session_id,
new_message=Content(role="user", parts=[Part.from_text(text=results)]),
)
report = await parse_agent_response(events)
return {"report": report, "task_count": len(tasks)}
researcher_service = restate.VirtualObject("Researcher")
@researcher_service.handler()
async def run_researcher(ctx: restate.ObjectContext, task: ResearchTask) -> str:
events = research_runner.run_async(
user_id=ctx.key(),
session_id=str(ctx.uuid()),
new_message=Content(role="user", parts=[Part.from_text(text=task.question)]),
)
return await parse_agent_response(events)
Run this example
Run this example
Install Restate and launch it:Get the example:Export your Google API key and run the agent:Register the agents with Restate:Send a request:
restate-server
restate example python-google-adk-tour-of-agents && cd python-google-adk-tour-of-agents
export GOOGLE_API_KEY=your-api-key
uv run app/workflow_orchestrator.py
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/user123/generate \
--json '{
"sessionId": "session-123",
"topic": "The impact of renewable energy on global economies"
}'
workflow_orchestrator.py
planner = Agent(
"openai:gpt-5.4",
system_prompt="You are a research planner. Break the topic into 2-4 research sub-tasks.",
output_type=TaskList,
)
restate_planner = RestateAgent(planner)
researcher = Agent(
"openai:gpt-5.4",
system_prompt="You are a research assistant. Provide a concise, factual answer.",
)
restate_researcher = RestateAgent(researcher)
writer = Agent(
"openai:gpt-5.4",
system_prompt="You are a report writer. Combine the research findings into a cohesive report.",
)
restate_writer = RestateAgent(writer)
report_service = restate.Service("ResearchReport")
@report_service.handler()
async def generate(ctx: restate.Context, req: ReportRequest) -> dict:
# Step 1: Orchestrator creates a research plan
plan_result = await restate_planner.run(req.topic)
tasks = plan_result.output.tasks
# Step 2: Dispatch workers in parallel
worker_promises = []
for task in tasks:
promise = ctx.service_call(run_researcher, task)
worker_promises.append(promise)
await restate.gather(*worker_promises)
findings = [await p for p in worker_promises]
# Step 3: Combine results into a report
report_result = await restate_writer.run(
f"Topic: {req.topic}\n\nResearch findings:\n{json.dumps(findings)}",
)
return {"report": report_result.output, "task_count": len(tasks)}
researcher_service = restate.Service("Researcher")
@researcher_service.handler()
async def run_researcher(_ctx: restate.Context, task: ResearchTask) -> str:
result = await restate_researcher.run(task.question)
return result.output
Run this example
Run this example
Install Restate and launch it:Get the example:Export your OpenAI API key and run the agent:Register the agents with Restate:Send a request:
restate-server
restate example python-pydantic-ai-tour-of-agents && cd python-pydantic-ai-tour-of-agents
export OPENAI_API_KEY=sk-...
uv run app/workflow_orchestrator.py
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{"topic": "The impact of renewable energy on global economies"}'
workflow_orchestrator.py
planner = create_agent(
model=init_chat_model("openai:gpt-5.4"),
system_prompt="You are a research planner. Break the topic into 2-4 research sub-tasks.",
response_format=TaskList,
middleware=[RestateMiddleware()],
)
researcher = create_agent(
model=init_chat_model("openai:gpt-5.4"),
system_prompt="You are a research assistant. Provide a concise, factual answer.",
middleware=[RestateMiddleware()],
)
writer = create_agent(
model=init_chat_model("openai:gpt-5.4"),
system_prompt="You are a report writer. Combine the research findings into a cohesive report.",
middleware=[RestateMiddleware()],
)
report_service = restate.Service("ResearchReport")
@report_service.handler()
async def generate(ctx: restate.Context, req: ReportRequest) -> dict:
# Step 1: Orchestrator creates a research plan.
plan_result = await planner.ainvoke({"messages": req.topic})
tasks: list[ResearchTask] = plan_result["structured_response"].tasks
# Step 2: Dispatch researchers in parallel.
worker_promises = [ctx.service_call(run_researcher, task) for task in tasks]
await restate.gather(*worker_promises)
findings = [await p for p in worker_promises]
# Step 3: Combine into a report.
message = f"Topic: {req.topic}\n\nResearch findings:\n{json.dumps(findings)}"
report_result = await writer.ainvoke({"messages": message})
return {"report": report_result["messages"][-1].content, "task_count": len(tasks)}
researcher_service = restate.Service("Researcher")
@researcher_service.handler()
async def run_researcher(_ctx: restate.Context, task: ResearchTask) -> str:
result = await researcher.ainvoke({"messages": task.question})
return result["messages"][-1].content
Run this example
Run this example
Install Restate and launch it:Get the example:Export your OpenAI API key and run the agent:Register the agents with Restate:Send a request:
restate-server
restate example python-langchain-tour-of-agents && cd python-langchain-tour-of-agents
export OPENAI_API_KEY=sk-...
uv run app/workflow_orchestrator.py
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{"topic": "The impact of renewable energy on global economies"}'
workflow-orchestrator.ts
export const researchWorker = restate.service({
name: "ResearchWorker",
handlers: {
research: async (ctx: restate.Context, req: { question: string }) => {
const answer = await ctx.run(
"Research",
async () =>
llmCall(
`You are a research assistant. Provide a concise, factual answer.\n\n${req.question}`,
),
{ maxRetryAttempts: 3 },
);
return { question: req.question, answer: answer.text };
},
},
});
const orchestrator = restate.service({
name: "ResearchReport",
handlers: {
generate: restate.createServiceHandler(
{ input: schema(ResearchRequestSchema) },
async (ctx: restate.Context, {topic}: { topic: string }) => {
// Step 1: Orchestrator creates a research plan
const planJson = await ctx.run(
"Create research plan",
async () =>
llmCall(
`You are a research planner. Break the topic into 2-4 research
sub-tasks. Respond with a JSON array of strings, each a specific
research question. Example: ["question 1", "question 2"]\n\nTopic: ${topic}`,
),
{ maxRetryAttempts: 3 },
);
const tasks: string[] = JSON.parse(planJson.text);
// Step 2: Dispatch workers in parallel
const workerResults = await RestatePromise.all(
tasks.map((question) =>
ctx.serviceClient(researchWorker).research({ question }),
),
);
// Step 3: Combine results into a report
const report = await ctx.run(
"Write report",
async () =>
llmCall(
`You are a report writer. Combine the research findings into a cohesive report.\n\n
Topic: ${topic}\n\nResearch findings:\n${JSON.stringify(workerResults)}`,
),
{ maxRetryAttempts: 3 },
);
return { report: report.text, taskCount: tasks.length };
},
),
},
});
Run this example
Run this example
Install Restate and launch it:Get the example:Export your API key:Register the services with Restate:Send a request:
restate-server
restate example typescript-restate-tour-of-agents && cd typescript-restate-tour-of-agents
npm install
export OPENAI_API_KEY=sk-...
npx tsx ./src/workflow-orchestrator.ts
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{"topic": "The impact of renewable energy on global economies"}'
workflow_orchestrator.py
researcher_service = restate.Service("ResearchWorker")
@researcher_service.handler()
async def research(ctx: restate.Context, req: ResearchTask) -> dict:
answer = await ctx.run_typed(
"Research",
llm_call,
RunOptions(max_attempts=3),
messages=f"You are a research assistant. Provide a concise, factual answer. {req.question}",
)
return {"question": req.question, "answer": answer.content}
report_service = restate.Service("ResearchReport")
@report_service.handler()
async def generate(ctx: restate.Context, req: ReportRequest) -> dict:
# Step 1: Orchestrator creates a research plan
plan_result = await ctx.run_typed(
"Create research plan",
llm_call,
RunOptions(max_attempts=3),
messages=f"You are a research planner. Break the topic into 2-4 research sub-tasks. {req.topic}",
response_format=TaskList,
)
if not plan_result.content:
raise restate.TerminalError("No research plan created")
tasks = TaskList.model_validate_json(plan_result.content).tasks
# Step 2: Dispatch workers in parallel
worker_promises = []
for task in tasks:
promise = ctx.service_call(research, task)
worker_promises.append(promise)
await restate.gather(*worker_promises)
findings = [await p for p in worker_promises]
# Step 3: Combine results into a report
report = await ctx.run_typed(
"Write report",
llm_call,
RunOptions(max_attempts=3),
messages=f"You are a report writer. Combine the research findings into a cohesive report."
f"Topic: {req.topic}\n\nResearch findings:\n{json.dumps(findings)}",
)
return {"report": report.content, "task_count": len(tasks)}
Run this example
Run this example
Install Restate and launch it:Get the example:Export your API key:Register the services with Restate:Send a request:
restate-server
restate example python-restate-tour-of-agents && cd python-restate-tour-of-agents
export OPENAI_API_KEY=sk-...
uv run app/workflow_orchestrator.py
restate deployments register http://localhost:9080 --force --yes # dev only: overrides previous registrations
curl localhost:8080/restate/call/ResearchReport/generate \
--json '{"topic": "The impact of renewable energy on global economies"}'
