Workflows
Workflows are a sequence of steps that gets executed durably. A workflow can be seen as a special type of Virtual Object with some special characteristics:
- Each workflow definition has a
run
handler that implements the workflow logic. - The
run
handler executes exactly one time for each workflow instance (object / key). - A workflow definition can implement other handlers that can be called multiple times, and can interact with the workflow.
- Workflows have access to the
WorkflowContext
andSharedWorkflowContext
, giving them some extra functionality, for example Durable Promises to signal workflows.
The retention time of a workflow execution is 24 hours after the finishing of the run
handler.
After this timeout any K/V state is cleared, the workflow's shared handlers cannot be called anymore, and the Durable Promises are discarded.
The retention time can be configured via the Admin API per Workflow definition by setting workflow_completion_retention
.
Implementing workflows
Have a look at the code example to get a better understanding of how workflows are implemented:
- Java
- Kotlin
The run
handler
Every workflow needs a run
handler.
This handler has access to the same SDK features as Service and Virtual Object handlers.
For example, use ctx.run
to log intermediate results in Restate and avoid re-execution on replay.
@Workflowpublic class SignupWorkflow { private static final DurablePromiseKey<String> EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); private static final StateKey<String> STATUS = StateKey.of("status", JsonSerdes.STRING); @Workflow public boolean run(WorkflowContext ctx, Email email) { String secret = ctx.random().nextUUID().toString(); ctx.set(STATUS, "Generated secret"); ctx.run("send email", () -> sendEmailWithLink(email, secret)); ctx.set(STATUS, "Sent email"); String clickSecret = ctx.promise(EMAIL_CLICKED) .awaitable() .await(); ctx.set(STATUS, "Clicked email"); return clickSecret.equals(secret); } @Shared public void click(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); } @Shared public String getStatus(SharedWorkflowContext ctx) { return ctx.get(STATUS).orElse("Unknown"); } public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new SignupWorkflow()) .buildAndListen(); }}
Querying workflows
Similar to Virtual Objects, you can retrieve the K/V state of workflows via the other handlers defined in the workflow definition,
For example, here we expose the status of the workflow to external clients.
Every workflow execution can be seen as a new object, so the state is isolated to a single workflow execution.
The state can only be mutated by the run
handler of the workflow. The other handlers can only read the state.
@Workflowpublic class SignupWorkflow { private static final DurablePromiseKey<String> EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); private static final StateKey<String> STATUS = StateKey.of("status", JsonSerdes.STRING); @Workflow public boolean run(WorkflowContext ctx, Email email) { String secret = ctx.random().nextUUID().toString(); ctx.set(STATUS, "Generated secret"); ctx.run("send email", () -> sendEmailWithLink(email, secret)); ctx.set(STATUS, "Sent email"); String clickSecret = ctx.promise(EMAIL_CLICKED) .awaitable() .await(); ctx.set(STATUS, "Clicked email"); return clickSecret.equals(secret); } @Shared public void click(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); } @Shared public String getStatus(SharedWorkflowContext ctx) { return ctx.get(STATUS).orElse("Unknown"); } public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new SignupWorkflow()) .buildAndListen(); }}
Signaling workflows
You can use Durable Promises to interact with your running workflows: to let the workflow block until an event occurs, or to send a signal / information into or out of a running workflow. These promises are durable and distributed, meaning they survive crashes and can be resolved or rejected by any handler in the workflow.
Do the following:
- Create a promise in your
run
handler that is durable and distributed - Resolve or reject the promise in any other handler in the workflow. This can be done at most one time.
@Workflowpublic class SignupWorkflow { private static final DurablePromiseKey<String> EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); private static final StateKey<String> STATUS = StateKey.of("status", JsonSerdes.STRING); @Workflow public boolean run(WorkflowContext ctx, Email email) { String secret = ctx.random().nextUUID().toString(); ctx.set(STATUS, "Generated secret"); ctx.run("send email", () -> sendEmailWithLink(email, secret)); ctx.set(STATUS, "Sent email"); String clickSecret = ctx.promise(EMAIL_CLICKED) .awaitable() .await(); ctx.set(STATUS, "Clicked email"); return clickSecret.equals(secret); } @Shared public void click(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); } @Shared public String getStatus(SharedWorkflowContext ctx) { return ctx.get(STATUS).orElse("Unknown"); } public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new SignupWorkflow()) .buildAndListen(); }}
Serving and registering workflows
You serve workflows in the same way as Services and Virtual Objects: by binding them to an HTTP endpoint or AWS Lambda handler. Make sure you register the endpoint or Lambda handler in Restate before invoking it.
@Workflowpublic class SignupWorkflow { private static final DurablePromiseKey<String> EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); private static final StateKey<String> STATUS = StateKey.of("status", JsonSerdes.STRING); @Workflow public boolean run(WorkflowContext ctx, Email email) { String secret = ctx.random().nextUUID().toString(); ctx.set(STATUS, "Generated secret"); ctx.run("send email", () -> sendEmailWithLink(email, secret)); ctx.set(STATUS, "Sent email"); String clickSecret = ctx.promise(EMAIL_CLICKED) .awaitable() .await(); ctx.set(STATUS, "Clicked email"); return clickSecret.equals(secret); } @Shared public void click(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); } @Shared public String getStatus(SharedWorkflowContext ctx) { return ctx.get(STATUS).orElse("Unknown"); } public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new SignupWorkflow()) .buildAndListen(); }}
The run
handler
Every workflow needs a run
handler.
This handler has access to the same SDK features as Service and Virtual Object handlers.
For example, use ctx.run
to log intermediate results in Restate and avoid re-execution on replay.
Querying workflows
Similar to Virtual Objects, you can retrieve the K/V state of workflows via the other handlers defined in the workflow definition,
For example, here we expose the status of the workflow to external clients.
Every workflow execution can be seen as a new object, so the state is isolated to a single workflow execution.
The state can only be mutated by the run
handler of the workflow. The other handlers can only read the state.
Signaling workflows
You can use Durable Promises to interact with your running workflows: to let the workflow block until an event occurs, or to send a signal / information into or out of a running workflow. These promises are durable and distributed, meaning they survive crashes and can be resolved or rejected by any handler in the workflow.
Do the following:
- Create a promise in your
run
handler that is durable and distributed - Resolve or reject the promise in any other handler in the workflow. This can be done at most one time.
Serving and registering workflows
You serve workflows in the same way as Services and Virtual Objects: by binding them to an HTTP endpoint or AWS Lambda handler. Make sure you register the endpoint or Lambda handler in Restate before invoking it.
@Workflowpublic class SignupWorkflow { private static final DurablePromiseKey<String> EMAIL_CLICKED = DurablePromiseKey.of("email_clicked", JsonSerdes.STRING); private static final StateKey<String> STATUS = StateKey.of("status", JsonSerdes.STRING); @Workflow public boolean run(WorkflowContext ctx, Email email) { String secret = ctx.random().nextUUID().toString(); ctx.set(STATUS, "Generated secret"); ctx.run("send email", () -> sendEmailWithLink(email, secret)); ctx.set(STATUS, "Sent email"); String clickSecret = ctx.promise(EMAIL_CLICKED) .awaitable() .await(); ctx.set(STATUS, "Clicked email"); return clickSecret.equals(secret); } @Shared public void click(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret); } @Shared public String getStatus(SharedWorkflowContext ctx) { return ctx.get(STATUS).orElse("Unknown"); } public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new SignupWorkflow()) .buildAndListen(); }}
The run
handler
Every workflow needs a run
handler.
This handler has access to the same SDK features as Service and Virtual Object handlers.
For example, use ctx.runBlock
to log intermediate results in Restate and avoid re-execution on replay.
@Workflowclass SignupWorkflow { companion object { private val EMAIL_CLICKED = KtDurablePromiseKey.json<String>("email_clicked") private val STATUS = KtStateKey.json<String>("status") } @Workflow suspend fun run(ctx: WorkflowContext, email: Email): Boolean { val secret = ctx.random().nextUUID().toString() ctx.set(STATUS, "Generated secret") ctx.runBlock("send email") { sendEmailWithLink(email, secret) } val clickSecret = ctx.promise(EMAIL_CLICKED).awaitable().await() ctx.set(STATUS, "Clicked email") return clickSecret == secret } @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) } @Shared suspend fun getStatus(ctx: SharedWorkflowContext): String? { return ctx.get(STATUS) }}fun main() { RestateHttpEndpointBuilder.builder().bind(SignupWorkflow()).buildAndListen()}
Querying workflows
Similar to Virtual Objects, you can retrieve the K/V state of workflows via the other handlers defined in the workflow definition,
For example, here we expose the status of the workflow to external clients.
Every workflow execution can be seen as a new object, so the state is isolated to a single workflow execution.
The state can only be mutated by the run
handler of the workflow. The other handlers can only read the state.
@Workflowclass SignupWorkflow { companion object { private val EMAIL_CLICKED = KtDurablePromiseKey.json<String>("email_clicked") private val STATUS = KtStateKey.json<String>("status") } @Workflow suspend fun run(ctx: WorkflowContext, email: Email): Boolean { val secret = ctx.random().nextUUID().toString() ctx.set(STATUS, "Generated secret") ctx.runBlock("send email") { sendEmailWithLink(email, secret) } val clickSecret = ctx.promise(EMAIL_CLICKED).awaitable().await() ctx.set(STATUS, "Clicked email") return clickSecret == secret } @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) } @Shared suspend fun getStatus(ctx: SharedWorkflowContext): String? { return ctx.get(STATUS) }}fun main() { RestateHttpEndpointBuilder.builder().bind(SignupWorkflow()).buildAndListen()}
Signaling workflows
You can use Durable Promises to interact with your running workflows: to let the workflow block until an event occurs, or to send a signal / information into or out of a running workflow. These promises are durable and distributed, meaning they survive crashes and can be resolved or rejected by any handler in the workflow.
Do the following:
- Create a promise in your
run
handler that is durable and distributed - Resolve or reject the promise in any other handler in the workflow. This can be done at most one time.
@Workflowclass SignupWorkflow { companion object { private val EMAIL_CLICKED = KtDurablePromiseKey.json<String>("email_clicked") private val STATUS = KtStateKey.json<String>("status") } @Workflow suspend fun run(ctx: WorkflowContext, email: Email): Boolean { val secret = ctx.random().nextUUID().toString() ctx.set(STATUS, "Generated secret") ctx.runBlock("send email") { sendEmailWithLink(email, secret) } val clickSecret = ctx.promise(EMAIL_CLICKED).awaitable().await() ctx.set(STATUS, "Clicked email") return clickSecret == secret } @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) } @Shared suspend fun getStatus(ctx: SharedWorkflowContext): String? { return ctx.get(STATUS) }}fun main() { RestateHttpEndpointBuilder.builder().bind(SignupWorkflow()).buildAndListen()}
Serving and registering workflows
You serve workflows in the same way as Services and Virtual Objects: by binding them to an HTTP endpoint or AWS Lambda handler. Make sure you register the endpoint or Lambda handler in Restate before invoking it.
@Workflowclass SignupWorkflow { companion object { private val EMAIL_CLICKED = KtDurablePromiseKey.json<String>("email_clicked") private val STATUS = KtStateKey.json<String>("status") } @Workflow suspend fun run(ctx: WorkflowContext, email: Email): Boolean { val secret = ctx.random().nextUUID().toString() ctx.set(STATUS, "Generated secret") ctx.runBlock("send email") { sendEmailWithLink(email, secret) } val clickSecret = ctx.promise(EMAIL_CLICKED).awaitable().await() ctx.set(STATUS, "Clicked email") return clickSecret == secret } @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) } @Shared suspend fun getStatus(ctx: SharedWorkflowContext): String? { return ctx.get(STATUS) }}fun main() { RestateHttpEndpointBuilder.builder().bind(SignupWorkflow()).buildAndListen()}
The run
handler
Every workflow needs a run
handler.
This handler has access to the same SDK features as Service and Virtual Object handlers.
For example, use ctx.runBlock
to log intermediate results in Restate and avoid re-execution on replay.
Querying workflows
Similar to Virtual Objects, you can retrieve the K/V state of workflows via the other handlers defined in the workflow definition,
For example, here we expose the status of the workflow to external clients.
Every workflow execution can be seen as a new object, so the state is isolated to a single workflow execution.
The state can only be mutated by the run
handler of the workflow. The other handlers can only read the state.
Signaling workflows
You can use Durable Promises to interact with your running workflows: to let the workflow block until an event occurs, or to send a signal / information into or out of a running workflow. These promises are durable and distributed, meaning they survive crashes and can be resolved or rejected by any handler in the workflow.
Do the following:
- Create a promise in your
run
handler that is durable and distributed - Resolve or reject the promise in any other handler in the workflow. This can be done at most one time.
Serving and registering workflows
You serve workflows in the same way as Services and Virtual Objects: by binding them to an HTTP endpoint or AWS Lambda handler. Make sure you register the endpoint or Lambda handler in Restate before invoking it.
@Workflowclass SignupWorkflow { companion object { private val EMAIL_CLICKED = KtDurablePromiseKey.json<String>("email_clicked") private val STATUS = KtStateKey.json<String>("status") } @Workflow suspend fun run(ctx: WorkflowContext, email: Email): Boolean { val secret = ctx.random().nextUUID().toString() ctx.set(STATUS, "Generated secret") ctx.runBlock("send email") { sendEmailWithLink(email, secret) } val clickSecret = ctx.promise(EMAIL_CLICKED).awaitable().await() ctx.set(STATUS, "Clicked email") return clickSecret == secret } @Shared suspend fun click(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_CLICKED).resolve(secret) } @Shared suspend fun getStatus(ctx: SharedWorkflowContext): String? { return ctx.get(STATUS) }}fun main() { RestateHttpEndpointBuilder.builder().bind(SignupWorkflow()).buildAndListen()}
Submitting workflows with SDK clients
- Java
- Kotlin
Submit:
This returns a handle to the workflow once it has been registered in Restate.
You can only submit once per workflow ID (here "someone"
).
Client restate = Client.connect("http://localhost:8080");SendResponse handle = SignupWorkflowClient.fromClient(restate, "someone") .submit(email);
Submit:
This returns a handle to the workflow once it has been registered in Restate.
You can only submit once per workflow ID (here "someone"
).
Client restate = Client.connect("http://localhost:8080");SendResponse handle = SignupWorkflowClient.fromClient(restate, "someone") .submit(email);
Query/signal:
Call the other handlers of the workflow in the same way as for Virtual Object handlers.
Use send()
for one-way calls.
String status = SignupWorkflowClient.fromClient(restate, "someone") .getStatus();
Query/signal:
Call the other handlers of the workflow in the same way as for Virtual Object handlers.
Use send()
for one-way calls.
String status = SignupWorkflowClient.fromClient(restate, "someone") .getStatus();
Attach/peek: This lets you attach to a workflow and wait for it to finish, or to peek whether the result is ready.
// Option 1: attach and wait for resultboolean result = SignupWorkflowClient.fromClient(restate, "someone") .workflowHandle() .attach();// Option 2: peek to check if readyOutput<Boolean> peekOutput = SignupWorkflowClient.fromClient(restate, "someone") .workflowHandle() .getOutput();if (peekOutput.isReady()) { boolean result2 = peekOutput.getValue();}
Attach/peek: This lets you attach to a workflow and wait for it to finish, or to peek whether the result is ready.
// Option 1: attach and wait for resultboolean result = SignupWorkflowClient.fromClient(restate, "someone") .workflowHandle() .attach();// Option 2: peek to check if readyOutput<Boolean> peekOutput = SignupWorkflowClient.fromClient(restate, "someone") .workflowHandle() .getOutput();if (peekOutput.isReady()) { boolean result2 = peekOutput.getValue();}
Submit:
This returns a handle to the workflow once it has been registered in Restate.
You can only submit once per workflow ID (here "someone"
).
val restate = Client.connect("http://localhost:8080")val handle: SendResponse = SignupWorkflowClient.fromClient(restate, "someone").submit(email)
Submit:
This returns a handle to the workflow once it has been registered in Restate.
You can only submit once per workflow ID (here "someone"
).
val restate = Client.connect("http://localhost:8080")val handle: SendResponse = SignupWorkflowClient.fromClient(restate, "someone").submit(email)
Query/signal:
Call the other handlers of the workflow in the same way as for Virtual Object handlers.
Use send()
for one-way calls.
val status = SignupWorkflowClient.fromClient(restate, "someone").getStatus()
Query/signal:
Call the other handlers of the workflow in the same way as for Virtual Object handlers.
Use send()
for one-way calls.
val status = SignupWorkflowClient.fromClient(restate, "someone").getStatus()
Attach/peek: This lets you attach to a workflow and wait for it to finish, or to peek whether the result is ready.
// Option 1: attach and wait for resultval result = SignupWorkflowClient.fromClient(restate, "someone").workflowHandle().attach()// Option 2: peek to check if readyval peekOutput = SignupWorkflowClient.fromClient(restate, "someone").workflowHandle().outputif (peekOutput.isReady) { val result2 = peekOutput.value}
Attach/peek: This lets you attach to a workflow and wait for it to finish, or to peek whether the result is ready.
// Option 1: attach and wait for resultval result = SignupWorkflowClient.fromClient(restate, "someone").workflowHandle().attach()// Option 2: peek to check if readyval peekOutput = SignupWorkflowClient.fromClient(restate, "someone").workflowHandle().outputif (peekOutput.isReady) { val result2 = peekOutput.value}
Submitting workflows from a Restate service
- Java
- Kotlin
Submit/query/signal:
Use the generated client to call any workflow handler in the same way as for Services and Virtual Objects.
This returns the result of the workflow/handler once it has finished.
Use .send()
for to call the handler without waiting for the result.
You can only call the run
handler (submit) once per workflow ID (here "someone"
).
@Handlerpublic void setup(ObjectContext ctx, Email email) { boolean result = SignupWorkflowClient.fromContext(ctx, "someone") .run(email) .await();}@Handlerpublic void queryStatus(ObjectContext ctx) { String status = SignupWorkflowClient.fromContext(ctx, "someone") .getStatus() .await();}
Submit/query/signal:
Use the generated client to call any workflow handler in the same way as for Services and Virtual Objects.
This returns the result of the workflow/handler once it has finished.
Use .send()
for to call the handler without waiting for the result.
You can only call the run
handler (submit) once per workflow ID (here "someone"
).
@Handlerpublic void setup(ObjectContext ctx, Email email) { boolean result = SignupWorkflowClient.fromContext(ctx, "someone") .run(email) .await();}@Handlerpublic void queryStatus(ObjectContext ctx) { String status = SignupWorkflowClient.fromContext(ctx, "someone") .getStatus() .await();}
Submit/query/signal:
Use the generated client to call any workflow handler in the same way as for Services and Virtual Objects.
This returns the result of the workflow/handler once it has finished.
Use .send()
for to call the handler without waiting for the result.
You can only call the run
handler (submit) once per workflow ID (here "someone"
).
@Handlersuspend fun setup(ctx: ObjectContext, email: Email) { val result = SignupWorkflowClient.fromContext(ctx, "someone").run(email).await()}@Handlersuspend fun queryStatus(ctx: ObjectContext) { val status = SignupWorkflowClient.fromContext(ctx, "someone").getStatus().await()}
Submit/query/signal:
Use the generated client to call any workflow handler in the same way as for Services and Virtual Objects.
This returns the result of the workflow/handler once it has finished.
Use .send()
for to call the handler without waiting for the result.
You can only call the run
handler (submit) once per workflow ID (here "someone"
).
@Handlersuspend fun setup(ctx: ObjectContext, email: Email) { val result = SignupWorkflowClient.fromContext(ctx, "someone").run(email).await()}@Handlersuspend fun queryStatus(ctx: ObjectContext) { val status = SignupWorkflowClient.fromContext(ctx, "someone").getStatus().await()}
Submitting workflows over HTTP
Submit/query/signal:
Call any handler of the workflow in the same way as for Services and Virtual Objects.
This returns the result of the handler once it has finished.
Add /send
to the path for one-way calls.
You can only call the run
handler once per workflow ID (here "someone"
).
curl localhost:8080/SignupWorkflow/someone/run \ -H 'content-type: application/json' \ -d '{"email": "[email protected]"}'
Submit/query/signal:
Call any handler of the workflow in the same way as for Services and Virtual Objects.
This returns the result of the handler once it has finished.
Add /send
to the path for one-way calls.
You can only call the run
handler once per workflow ID (here "someone"
).
curl localhost:8080/SignupWorkflow/someone/run \ -H 'content-type: application/json' \ -d '{"email": "[email protected]"}'
Attach/peek: This lets you retrieve the result of a workflow or check if it's finished.
curl localhost:8080/restate/workflow/SignupWorkflow/someone/attachcurl localhost:8080/restate/workflow/SignupWorkflow/someone/output
Attach/peek: This lets you retrieve the result of a workflow or check if it's finished.
curl localhost:8080/restate/workflow/SignupWorkflow/someone/attachcurl localhost:8080/restate/workflow/SignupWorkflow/someone/output
Inspecting workflows
Have a look at the introspection docs on how to inspect workflows. You can use this to for example: