Async tasks
Flexible, durable scheduling across processes and time.
Restate lets you write resilient scheduling logic with the flexibility of code: delay execution, re-attach to ongoing tasks, fan-out/in patterns, and more. Restate guarantees that your tasks run to completion exactly once.
Durable timers
(Delayed) task queue
Switch between async and sync
Scheduling async tasks with Restate
Restate as message queue
Any function becomes a durable async task by using Restate as your message queue. Restate persists all requests and ensures they run to completion with automatic retries and recovery upon failures.
Restate as message queue
Any function becomes a durable async task by using Restate as your message queue. Restate persists all requests and ensures they run to completion with automatic retries and recovery upon failures.
Schedule async tasks
You can schedule tasks with the SDK clients or via HTTP, and you can postpone execution via the optional delay parameter.
Schedule async tasks
You can schedule tasks with the SDK clients or via HTTP, and you can postpone execution via the optional delay parameter.
Deduplicate requests
Use an idempotency key to ensure that the task is only scheduled once. For duplicate requests, Restate returns the previous response.
Deduplicate requests
Use an idempotency key to ensure that the task is only scheduled once. For duplicate requests, Restate returns the previous response.
Latch on to the task
Use the idempotency key to latch onto the task later and retrieve the result. You can also let another process latch on.
Latch on to the task
Use the idempotency key to latch onto the task later and retrieve the result. You can also let another process latch on.
- TypeScript
- Java
- Kotlin
- Go
- Python
const asyncTaskService = restate.service({name: "taskWorker",handlers: {runTask: async (ctx: Context, params: TaskOpts) => {return someHeavyWork(params);},},});export type AsyncTaskService = typeof asyncTaskService;
// The TypeScript SDK includes a client to send requests to servicesconst restateClient = restate.connect({ url: RESTATE_URL });const taskHandle = await restateClient.serviceSendClient<AsyncTaskService>({ name: "taskWorker" }).runTask(task,SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ", delay: 1000 }));// Attach to the async task to get the resultconst result = await restateClient.result(taskHandle);
@Servicepublic class AsyncTaskService {@Handlerpublic String runTask(Context ctx, TaskOpts params) {return someHeavyWork(params);}}
// The Java SDK generates clients for each serviceClient restateClient = Client.connect(RESTATE_URL);SendResponse handle =AsyncTaskServiceClient.fromClient(restateClient).send(Duration.ofDays(5)).runTask(taskOpts,CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ"));// Attach to the async task to get the resultString result =restateClient.invocationHandle(handle.getInvocationId(), JsonSerdes.STRING).attach();
@Serviceclass AsyncTaskService {@Handlersuspend fun runTask(ctx: Context, params: TaskOpts): String {return params.someHeavyWork()}}
// The Kotlin SDK generates clients for each serviceval restateClient: Client = Client.connect(RESTATE_URL)val handle =AsyncTaskServiceClient.fromClient(restateClient).send(5.days).runTask(taskOpts,CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ"),)// Attach to the async task to get the resultval result =restateClient.invocationHandle(handle.invocationId,KtSerdes.json<String>(),).attach()
type AsyncTaskWorker struct{}func (AsyncTaskWorker) RunTask(ctx restate.Context, task TaskOpts) (Result, error) {return someHeavyWork(task)}
client := &http.Client{}url := fmt.Sprintf("%s/AsyncTaskWorker/RunTask/Send", RESTATE_URL)taskData, _ := json.Marshal(task)req, err := http.NewRequest("POST", url, bytes.NewBuffer(taskData))if err != nil {return err}req.Header.Set("Content-Type", "application/json")req.Header.Set("idempotency-key", "dQw4w9WgXcQ")resp, err := client.Do(req)if err != nil {return err}defer resp.Body.Close()// ... do other things while the task is being processed ...attachUrl := fmt.Sprintf("%s/restate/invocation/AsyncTaskWorker/RunTask/%s/attach",RESTATE_URL,"dQw4w9WgXcQ")resp, err = http.DefaultClient.Get(attachUrl)if err != nil {return err}defer resp.Body.Close()
async_task_service = Service("taskWorker")@async_task_service.handler("runTask")async def run_task(ctx: Context, params: TaskOpts):return some_heavy_work(params)app = restate.app([async_task_service])
requests.post(f"{RESTATE_URL}/taskWorker/runTask/send",json=json.dumps(task),headers={"idempotency-key": "dQw4w9WgXcQ","Content-Type": "application/json"})# Attach to the async task to retrieve the resultattach_url = f"{RESTATE_URL}/restate/invocation/taskWorker/runTask/dQw4w9WgXcQ/attach"response = requests.get(attach_url)
Restate as sophisticated task queue
Restate is built as an event-driven foundation, and therefore supports task queues by design.
Async tasks run like any other function in your infrastructure: on K8S, FaaS, or mix-and-match.
No need to spin up extra infrastructure or message queues.
Parallelizing work with Restate
Fan out
Write flexible scheduling logic with Restate's durable building blocks. Fan out tasks with resilient RPC calls. Restate makes sure all tasks run to completion, with retries and recovery upon failures.
Fan out
Write flexible scheduling logic with Restate's durable building blocks. Fan out tasks with resilient RPC calls. Restate makes sure all tasks run to completion, with retries and recovery upon failures.
Fan in
Invocations produce durable promises that can be awaited and combined. These durable promises can be recovered on other processes after a failure.
Fan in
Invocations produce durable promises that can be awaited and combined. These durable promises can be recovered on other processes after a failure.
Server(less)
Deploy this service or its subtask processors on a platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
Server(less)
Deploy this service or its subtask processors on a platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
- TypeScript
- Java
- Kotlin
- Go
- Python
const workerService = restate.service({name: "worker",handlers: {run: async (ctx: Context, task: Task) => {// Split the task in subtasksconst subtasks: SubTask[] = await ctx.run("split task", () =>split(task));const resultPromises = [];for (const subtask of subtasks) {const subResultPromise = ctx.serviceClient(workerService).runSubtask(subtask);resultPromises.push(subResultPromise);}const results = await CombineablePromise.all(resultPromises);return aggregate(results);},runSubtask: async (ctx: Context, subtask: SubTask) => {// Processing logic goes here ...// Can be moved to a separate service to scale independently},},});export const handler = restate.endpoint().bind(workerService).handler();
@Servicepublic class FanOutWorker {@Handlerpublic Result run(Context ctx, Task task) {// Split the task in subtasksList<SubTask> subTasks =ctx.run(JacksonSerdes.of(new TypeReference<>() {}),() -> split(task));List<Awaitable<?>> resultFutures = new ArrayList<>();for (SubTask subTask : subTasks) {resultFutures.add(FanOutWorkerClient.fromContext(ctx).runSubtask(subTask));}Awaitable.all(resultFutures).await();var results = resultFutures.stream().map(future -> (SubTaskResult) future.await()).toList();return aggregate(results);}@Handlerpublic SubTaskResult runSubtask(Context ctx, SubTask subTask) {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn executeSubtask(ctx, subTask);}public static void main(String[] args) {RestateHttpEndpointBuilder.builder().bind(new FanOutWorker()).buildAndListen();}}
@Serviceclass FanOutWorker {@Handlersuspend fun run(ctx: Context, task: Task): TaskResult {val subTasks = ctx.runBlock { task.split() }val results =subTasks.map {FanOutWorkerClient.fromContext(ctx).runSubtask(it)}.awaitAll()return results.aggregate()}@Handlersuspend fun runSubtask(ctx: Context, subTask: SubTask): SubTaskResult {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn subTask.execute(ctx)}}
type FanOutWorker struct{}func (FanOutWorker) Run(ctx restate.Context, task Task) (Result, error) {// Split the task in subtassksubtasks, err := split(task)if err != nil {return Result{}, err}subtaskFutures := make([]restate.Selectable, 0, len(subtasks))for _, subtask := range subtasks {subtaskFutures = append(subtaskFutures,restate.Service[SubTaskResult](ctx, "FanOutWorker", "RunSubtask").RequestFuture(subtask))}selector := restate.Select(ctx, subtaskFutures...)subResults := make([]SubTaskResult, 0, len(subtasks))for selector.Remaining() {response, err := selector.Select().(restate.ResponseFuture[SubTaskResult]).Response()if err != nil {return Result{}, err}subResults = append(subResults, response)}return aggregate(subResults)}func (FanOutWorker) RunSubtask(ctx restate.Context, subtask SubTask) (SubTaskResult, error) {// Processing logic goes here ...// Can be moved to a separate service or FaaS to scale independentlyreturn executeSubtask(ctx, subtask)}
worker_service = Service("worker")@worker_service.handler()async def run(ctx: Context, task: Task):subtasks = await ctx.run("split task", lambda: split(task))result_promises = []for subtask in subtasks:sub_result_promise = ctx.service_call(run_subtask, arg=subtask)result_promises.append(sub_result_promise)results = [await promise for promise in result_promises]return aggregate(results)@worker_service.handler()async def run_subtask(ctx: Context, subtask: Subtask):# Processing logic goes here...# Can be moved to a separate service to scale independentlypassapp = restate.app([worker_service])
LOW-LATENCY
Restate’s event-driven foundation built in Rust lets you queue events. Restate pushes them to your functions at high speed.
Learn more
DURABLE EXECUTION
Restate guarantees all tasks run to completion. It keeps track of timers, handles retries and recovery upon failures, and ensures that tasks are executed exactly once.
Learn more
Durable webhook processing with Restate
Restate handlers as durable event processors
Point your webhook endpoint to any Restate handler. Restate makes sure all events are persisted and run to completion.
Restate handlers as durable event processors
Point your webhook endpoint to any Restate handler. Restate makes sure all events are persisted and run to completion.
Schedule follow-up tasks for webhook events, like reminders or escalations.
Schedule follow-up tasks for webhook events, like reminders or escalations.
Stateful handlers and event joins
Correlate or join asynchronous events by routing them to the same object.
Restate ensures sequential processing of events for the same key while giving access to durable key-value state.
Stateful handlers and event joins
Correlate or join asynchronous events by routing them to the same object.
Restate ensures sequential processing of events for the same key while giving access to durable key-value state.
- TypeScript
- Java
- Kotlin
- Go
- Python
const paymentTracker = restate.object({ // one instance per invoice idname: "PaymentTracker",handlers: {// Stripe sends us webhook events for invoice payment attemptsonPaymentSuccess: async (ctx: restate.ObjectContext, event: StripeEvent) => {ctx.set("paid", true);},onPaymentFailed: async (ctx: restate.ObjectContext, event: StripeEvent) => {if (await ctx.get<boolean>("paid")) {return;}const remindersCount = await ctx.get<number>("reminders_count") ?? 0;if (remindersCount < 3) {ctx.set("reminders_count", remindersCount + 1);await ctx.run(() => sendReminderEmail(event));// Schedule next reminder via a delayed self callawait ctx.objectSendClient(PaymentTracker,ctx.key, // this object's invoice id{delay: 24 * 60 * 60 * 1000}).onPaymentFailed(event);} else {await ctx.run(() => escalateToHuman(event));}},}})
@VirtualObjectpublic class PaymentTracker {StateKey<Boolean> PAID = StateKey.of("paid", BOOLEAN);StateKey<Integer> REMINDER_COUNT = StateKey.of("reminder_count", INT);// Stripe sends us webhook events for invoice payment attempts@Handlerpublic void onPaymentSuccess(ObjectContext ctx, StripeEvent event) {ctx.set(PAID, true);}@Handlerpublic void onPaymentFailure(ObjectContext ctx, StripeEvent event) {if (ctx.get(PAID).orElse(false)) {return;}int remindersCount = ctx.get(REMINDER_COUNT).orElse(0);if (remindersCount < 3) {ctx.set(REMINDER_COUNT, remindersCount + 1);ctx.run(() -> sendReminderEmail(event));// Schedule next reminder via a delayed self callPaymentTrackerClient.fromContext(ctx, ctx.key()).send(Duration.ofDays(1)).onPaymentFailure(event);} else {ctx.run(() -> escalateToHuman(event));}}}
@VirtualObjectclass PaymentTracker {companion object {private val PAID = KtStateKey.json<Boolean>("paid")private val REMINDER_COUNT = KtStateKey.json<Int>("reminder_count")}// Stripe sends us webhook events for invoice payment attempts@Handlersuspend fun onPaymentSuccess(ctx: ObjectContext, event: StripeEvent) {ctx.set(PAID, true)}@Handlersuspend fun onPaymentFailure(ctx: ObjectContext, event: StripeEvent) {if (ctx.get(PAID) == true) {return}val remindersCount = ctx.get(REMINDER_COUNT) ?: 0if (remindersCount < 3) {ctx.set(REMINDER_COUNT, remindersCount + 1)ctx.runBlock { sendReminderEmail(event) }// Schedule next reminder via a delayed self callPaymentTrackerClient.fromContext(ctx, ctx.key()).send(1.days).onPaymentFailure(event)} else {ctx.runBlock { escalateToHuman(event) }}}}
type PaymentTracker struct{} // one instance per invoice id// OnPaymentSuccess Stripe sends us webhook events for invoice payment attemptsfunc (PaymentTracker) OnPaymentSuccess(ctx restate.ObjectContext, event StripeEvent) restate.Void {restate.Set[bool](ctx, "paid", true)return restate.Void{}}func (PaymentTracker) OnPaymentFailure(ctx restate.ObjectContext, event StripeEvent) error {paid, err := restate.Get[bool](ctx, "paid")if err != nil {return err}if paid {return nil}remindersCount, err := restate.Get[int8](ctx, "reminders_count")if err != nil {return err}if remindersCount < 3 {restate.Set(ctx, "reminders_count", remindersCount+1)if _, err := restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return restate.Void{}, SendReminderEmail(event)}); err != nil {return err}// Schedule next reminder via a delayed self callrestate.ObjectSend(ctx,"PaymentTracker",restate.Key(ctx), // this object's invoice id"OnPaymentFailure").Send(event, restate.WithDelay(5*time.Second))} else {if _, err := restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return restate.Void{}, EscalateToHuman(event)}); err != nil {return err}}return nil}
payment_tracker = VirtualObject("PaymentTracker") # one instance per invoice ID# Stripe sends us webhook events for invoice payment attempts@payment_tracker.handler()async def on_payment_success(ctx: ObjectContext, event: StripeEvent):ctx.set("paid", True)@payment_tracker.handler()async def on_payment_failed(ctx: ObjectContext, event: StripeEvent):if await ctx.get("paid"):returnreminder_count = await ctx.get("reminder_count") or 0if reminder_count < 3:ctx.set("reminder_count", reminder_count + 1)await ctx.run("send_reminder", lambda: send_reminder_email(event))# Schedule next reminder via a delayed self callctx.object_send(on_payment_failed, # this handlerctx.key(), # this object invoice idevent,send_delay=timedelta(days=1))else:await ctx.run("escalate", lambda: escalate_to_human(event))
What you can build with Async Tasks and Restate
💡 You can invoke any handler asynchronously, so have a look at the other use case pages for more inspiration.