Skip to main content

Event Processing

Lightweight, transactional event processing.

Process Kafka events with flexible flows of transactional steps.
Restate takes care of the event plumbing and pushes events to your handler.

Lightweight Durable Functions
Functions are executed durably with fine-grained retries and workflow-like semantics.
Queue per key
Restate splits partitions into a queue per key. If it takes long to process an event, the events for other keys aren’t blocked.
Push events to functions
Restate pushes events to your handlers. No need to implement consumers, manage subscriptions/offsets, etc.

Event processing with Restate

Connect functions to Kafka topics. Restate pushes the events to your function.

Lightweight Durable Functions

Write functions that get a the Kafka event as an input. Functions execute with Durable Execution: their progress is tracked and they can be retried from the exact point before the crash, as if you are taking micro-checkpoints throughout the function execution.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

Push events to functions

Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering...

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

subscribe.sh

curl restate:9070/subscriptions --json '{
"source": "kafka://my-cluster/user-events",
"sink": "service://userUpdates/updateUserEvent"
}'

Queue per key

Events get send to objects based on the Kafka key. For each key, Restate ensures that events are processed sequentially and in order. Slow events on other keys do not block processing.

In the example, we process user updates in a queue per user. Slow updates for one user do not block updates for other users.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

Postpone processing

Flexibly postpone processing of events until later. Restate tracks the timers and re-invokes. When sleeping, other events for that key are enqueued.

Here, we postpone processing for 5 seconds if the user profile is not ready yet.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

Durable side effects

The results of interactions with external systems are tracked and recovered after failures. This simplifies writing flows that keep multiple systems in sync.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

Flexible control flow

As opposed to many stream processing systems, Restate does not put any restrictions on the control flow (e.g. DAG). Each event craft its own path through the code and builds up its own recovery log.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

Lightweight Durable Functions

Write functions that get a the Kafka event as an input. Functions execute with Durable Execution: their progress is tracked and they can be retried from the exact point before the crash, as if you are taking micro-checkpoints throughout the function execution.

Push events to functions

Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering...

Queue per key

Events get send to objects based on the Kafka key. For each key, Restate ensures that events are processed sequentially and in order. Slow events on other keys do not block processing.

In the example, we process user updates in a queue per user. Slow updates for one user do not block updates for other users.

Postpone processing

Flexibly postpone processing of events until later. Restate tracks the timers and re-invokes. When sleeping, other events for that key are enqueued.

Here, we postpone processing for 5 seconds if the user profile is not ready yet.

Durable side effects

The results of interactions with external systems are tracked and recovered after failures. This simplifies writing flows that keep multiple systems in sync.

Flexible control flow

As opposed to many stream processing systems, Restate does not put any restrictions on the control flow (e.g. DAG). Each event craft its own path through the code and builds up its own recovery log.

user_updates.ts

const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);
let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}
const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});

LOW-LATENCY

Restate’s event-driven foundation built in Rust lets you process events at high speed. Restate keeps a queue per key and pushes events to your functions to process them in parallel as fast as possible.

DURABLE EXECUTION

Restate manages the complexities of reading from Kafka to make sure each event gets processed exactly once. Restate handles retries and recovery of your event handlers to the exact point before the crash.

Stateful event processing with Restate

Implement stateful event handlers with Restate.

K/V State

Store state in Restate and access it from other handlers. Restate guarantees that it is consistent and persistent. The state gets delivered together with the request, so you operate on local state.

profile_service.ts

const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},
featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},
emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})
type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}

Event Enrichment

Enrich events with data from multiple sources by storing it in state and eventually exposing it via other functions.

profile_service.ts

const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},
featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},
emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})
type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}

Delayed actions

Schedule async follow-up tasks for now or for later. Restate tracks the timers and triggers them when the time comes.

Here, we wait one second for other user features to arrive before sending the event to downstream processing.

profile_service.ts

const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},
featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},
emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})
type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}

Combine Kafka and RPC

Functions can be called over RPC or Kafka without changing the code. In the example, the registration can come over Kafka, while the email gets called via HTTP.

profile_service.ts

const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},
featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},
emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})
type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}

K/V State

Store state in Restate and access it from other handlers. Restate guarantees that it is consistent and persistent. The state gets delivered together with the request, so you operate on local state.

Event Enrichment

Enrich events with data from multiple sources by storing it in state and eventually exposing it via other functions.

Delayed actions

Schedule async follow-up tasks for now or for later. Restate tracks the timers and triggers them when the time comes.

Here, we wait one second for other user features to arrive before sending the event to downstream processing.

Combine Kafka and RPC

Functions can be called over RPC or Kafka without changing the code. In the example, the registration can come over Kafka, while the email gets called via HTTP.

profile_service.ts

const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},
featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},
emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})
type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}

Event handlers as regular, lightweight functions

Let Restate subscribe to a Kafka topic and push events to your functions.
Your functions run as normal Java or NodeJS services in your existing infra.

Event handlers as regular, lightweight functions

What you can build with Event Processing and Restate

Kafka-triggered workflows

Handle the payment, request the order preparation, wait for driver acceptance callback, etc.

Digital twin pattern

A delivery service that responds to location updates arriving over Kafka. The order gets updated accordingly.

Event-based task scheduling

For each event that comes in, schedule some tasks for in the future.

Wondering about a specific use case?

Let’s brainstorm together on Discord

Developer Resources

Blog post

Event-driven apps where event-driven is an implementation detail.

Docs

Read the docs to learn more.

Need help?

Join the Restate Discord channel