Event Processing
Lightweight, transactional event processing.
Process Kafka events with durable, transactional workflows.
Restate takes care of the event plumbing and pushes events to your handler.
Lightweight, resilient functions
From complex workflows to rapid ETL tasks
Zero Kafka consumer management
Transactional event processing with Restate
Durable business logic and side effects
Restate persists execution progress and automatically retries from the exact point of failure.
When you perform side effects in a run
block, Restate persists the result and replays it after crashes, enabling resilient workflow-like code.
Durable business logic and side effects
Restate persists execution progress and automatically retries from the exact point of failure.
When you perform side effects in a run
block, Restate persists the result and replays it after crashes, enabling resilient workflow-like code.
Zero Kafka consumer management
Connect functions to Kafka topics and let Restate handle all consumer management — including polling for records, committing offsets, and recovery.
Zero Kafka consumer management
Connect functions to Kafka topics and let Restate handle all consumer management — including polling for records, committing offsets, and recovery.
Queue per key
Events are routed to objects based on Kafka keys, with Restate ensuring sequential, in-order processing for each key. Processing delays for one key never block events for other keys. In the example, slow moderation checks do not block posts for other users.
Queue per key
Events are routed to objects based on Kafka keys, with Restate ensuring sequential, in-order processing for each key. Processing delays for one key never block events for other keys. In the example, slow moderation checks do not block posts for other users.
Schedule events or postpone processing
Flexibly postpone event processing or schedule asynchronous tasks for immediate or future execution. Restate tracks all timers and triggers execution exactly when needed. Restate's queue per key, makes sure that slow moderation checks do not block posts for other users.
Schedule events or postpone processing
Flexibly postpone event processing or schedule asynchronous tasks for immediate or future execution. Restate tracks all timers and triggers execution exactly when needed. Restate's queue per key, makes sure that slow moderation checks do not block posts for other users.
Flexible control flow
Unlike traditional stream processing systems, Restate imposes no restrictions on control flow (e.g., DAGs). Each event creates its own execution path through your code and builds a dedicated recovery log.
Flexible control flow
Unlike traditional stream processing systems, Restate imposes no restrictions on control flow (e.g., DAGs). Each event creates its own execution path through your code and builds a dedicated recovery log.
Scale with serverless
Deploy your functions on serverless platforms where Restate pushes events to them and scale based on demand.
Scale with serverless
Deploy your functions on serverless platforms where Restate pushes events to them and scale based on demand.
- TypeScript
- Java
- Kotlin
- Go
- Python
const userFeed = restate.object({name: "userFeed",handlers: {processPost: async (ctx: restate.ObjectContext, post: SocialMediaPost) => {const userId = ctx.keyconst postId = await ctx.run(() => createPost(userId, post));while ((await ctx.run(() => getPostStatus(postId))) === PENDING) {await ctx.sleep(5_000);}await ctx.run(() => updateUserFeed(userId, postId));},},});export const awsLambdaHandler = restate.endpoint().bind(userFeed).handler();
@VirtualObjectpublic class UserFeed {public record SocialMediaPost(String content, String metadata) {}@Handlerpublic void processPost(ObjectContext ctx, SocialMediaPost post) {String userId = ctx.key();String postId = ctx.run(STRING, () -> createPost(userId, post));while (ctx.run(STRING, () -> getPostStatus(postId)).equals("PENDING")) {ctx.sleep(Duration.ofSeconds(5));}ctx.run(() -> updateUserFeed(userId, postId));}}class MyLambdaHandler extends BaseRestateLambdaHandler {@Overridepublic void register(RestateLambdaEndpointBuilder builder) {builder.bind(new SubscriptionService());}}
@VirtualObjectclass UserFeed {@Serializable data class SocialMediaPost(val content: String, val metadata: String)@Handlersuspend fun processPost(ctx: ObjectContext, post: SocialMediaPost) {val userId = ctx.key()val postId = ctx.runBlock { createPost(userId, post) }while (ctx.runBlock { getPostStatus(postId) } == "PENDING") {ctx.sleep(5.seconds)}ctx.runBlock { updateUserFeed(userId, postId) }}}class MyLambdaHandler : BaseRestateLambdaHandler() {override fun register(builder: RestateLambdaEndpointBuilder) {builder.bind(SubscriptionObject())}}
type UserFeed struct{}func (UserFeed) ProcessPost(ctx restate.ObjectContext, post SocialMediaPost) error {var userId = restate.Key(ctx)postId, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {return CreatePost(userId, post)})if err != nil {return err}for {status, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {return GetPostStatus(postId), nil})if err != nil {return err}if status != PENDING {break}if err = restate.Sleep(ctx, 5*time.Second); err != nil {return err}}if _, err := restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return restate.Void{}, UpdateUserFeed(userId, postId)}); err != nil {return err}return nil}func main() {handler, err := server.NewRestate().Bind(restate.Reflect(UserFeed{})).Bidirectional(false).LambdaHandler()if err != nil {log.Fatal(err.Error())}lambda.Start(handler)}
user_feed = VirtualObject("UserFeed")@user_feed.handler()async def process_post(ctx: ObjectContext, post: SocialMediaPost):user_id = ctx.key()post_id = await ctx.run("profile update", lambda: create_post(user_id, post))current_status = await ctx.run("post status", lambda: get_post_status(post_id))while current_status == Status.PENDING:await ctx.sleep(timedelta(seconds=5))await ctx.run("update feed", lambda: update_user_feed(user_id, post_id))aws_lambda_handler = restate.app([user_feed])
Event handlers as regular, lightweight functions
Let Restate manage your Kafka subscriptions and deliver events directly to your functions.
Your handlers run as standard services in your existing infrastructure with no special requirements.
Stateful event processing with Restate
Implement stateful event handlers with Restate.
K/V State and Event Enrichment
Store persistent, consistent state directly in Restate and access it from any handler of the service. State is delivered with each request, allowing you to operate on local data without external database calls.
K/V State and Event Enrichment
Store persistent, consistent state directly in Restate and access it from any handler of the service. State is delivered with each request, allowing you to operate on local data without external database calls.
Agents, actors and state machines
Build event-driven agents, actors, digital twins, and state machines with Kafka integration. Restate provides simple concurrency guarantees while ensuring full resilience and consistency without additional infrastructure.
Agents, actors and state machines
Build event-driven agents, actors, digital twins, and state machines with Kafka integration. Restate provides simple concurrency guarantees while ensuring full resilience and consistency without additional infrastructure.
Combine Kafka and RPC
Use the same functions for both Kafka events and RPC calls without code changes. Process events from multiple sources — registration requests via HTTP, location updates via Kafka, and queries from dashboards — all using the same handlers.
Combine Kafka and RPC
Use the same functions for both Kafka events and RPC calls without code changes. Process events from multiple sources — registration requests via HTTP, location updates via Kafka, and queries from dashboards — all using the same handlers.
- TypeScript
- Java
- Kotlin
- Go
- Python
const packageTracker = restate.object({name: "package-tracker",handlers: {registerPackage: async (ctx: ObjectContext, packageInfo: PackageInfo) => {ctx.set("package-info", packageInfo);},updateLocation: async (ctx: ObjectContext, locationUpdate: LocationUpdate) => {const packageInfo = await ctx.get<PackageInfo>("package-info");if (!packageInfo) {throw new TerminalError(`Package not found`);}(packageInfo.locations ??= []).push(locationUpdate);ctx.set("package-info", packageInfo);},getPackageInfo: shared((ctx: ObjectSharedContext) =>ctx.get<PackageInfo>("package-info")),},});restate.endpoint().bind(packageTracker).listen();
@VirtualObjectpublic class PackageTracker {private static final StateKey<PackageInfo> PACKAGE_INFO =StateKey.of("package-info", JacksonSerdes.of(PackageInfo.class));@Handlerpublic void registerPackage(ObjectContext ctx, PackageInfo packageInfo) {ctx.set(PACKAGE_INFO, packageInfo);}@Handlerpublic void updateLocation(ObjectContext ctx, LocationUpdate locationUpdate) {var packageInfo =ctx.get(PACKAGE_INFO).orElseThrow(() -> new TerminalException("Package not found"));packageInfo.addLocation(locationUpdate);ctx.set(PACKAGE_INFO, packageInfo);}@Sharedpublic PackageInfo getPackageInfo(SharedObjectContext ctx) {return ctx.get(PACKAGE_INFO).orElseThrow(() -> new TerminalException("Package not found"));}public static void main(String[] args) {RestateHttpEndpointBuilder.builder().bind(new PackageTracker()).buildAndListen();}}
@VirtualObjectclass PackageTracker {companion object {private val PACKAGE_INFO = StateKey.of("package-info", KtSerdes.json<PackageInfo>())}@Handlersuspend fun registerPackage(ctx: ObjectContext, packageInfo: PackageInfo) {ctx.set(PACKAGE_INFO, packageInfo)}@Handlersuspend fun updateLocation(ctx: ObjectContext, locationUpdate: LocationUpdate) {val packageInfo =ctx.get(PACKAGE_INFO)?: throw TerminalException("Package not found")packageInfo.locations.add(locationUpdate)ctx.set(PACKAGE_INFO, packageInfo)}@Sharedsuspend fun getPackageInfo(ctx: SharedObjectContext): PackageInfo {return ctx.get(PACKAGE_INFO)?: throw TerminalException("Package not found")}}fun main() {RestateHttpEndpointBuilder.builder().bind(PackageTracker()).buildAndListen()}
type PackageTracker struct{}func (PackageTracker) RegisterPackage(ctx restate.ObjectContext, packageInfo PackageInfo) error {restate.Set[PackageInfo](ctx, "package-info", packageInfo)return nil}func (PackageTracker) UpdateLocation(ctx restate.ObjectContext, locationUpdate LocationUpdate) error {packageInfo, err := restate.Get[*PackageInfo](ctx, "package-info")if err != nil {return err}if packageInfo == nil {return restate.TerminalError(errors.New("package not found"))}if packageInfo.FinalDestination == "" {return restate.TerminalError(errors.New("package not found"))}packageInfo.Locations = append(packageInfo.Locations, locationUpdate)restate.Set[PackageInfo](ctx, "package-info", *packageInfo)return nil}func (PackageTracker) GetPackageInfo(ctx restate.ObjectSharedContext) (*PackageInfo, error) {return restate.Get[*PackageInfo](ctx, "package-info")}func main() {if err := server.NewRestate().Bind(restate.Reflect(PackageTracker{})).Start(context.Background(), ":9080"); err != nil {log.Fatal(err)}}
package_tracker = VirtualObject("package-tracker")@package_tracker.handler()async def register_package(ctx: ObjectContext, package_info: PackageInfo):ctx.set("package-info", package_info.model_dump())@package_tracker.handler()async def update_location(ctx: ObjectContext, location_update: LocationUpdate):package_info = await ctx.get("package-info")if package_info is None:raise TerminalError(f"Package {ctx.key()} not found")locations = package_info.get("locations", [])locations.append(location_update.model_dump())package_info["locations"] = locationsctx.set("package-info", package_info)@package_tracker.handler()async def get_package_info(ctx: ObjectContext):return await ctx.get("package-info")app = restate.app(services=[package_tracker])
LOW-LATENCY
Restate processes events at high speed using a dedicated queue per key, pushing events to your functions for maximum parallel throughput.
Learn more
DURABLE EXECUTION
Restate handles all Kafka interaction complexities, guarantees exactly-once processing, and recovers event handlers to the precise point before any failure.
Learn more
Detailed Observability
Understand what is happening in your event-driven apps, by using the UI, the CLI, and the built-in tracing.
Debug failing executions, inspect the K/V state, and manage deployments.

What you can build with Event Processing and Restate
💡 You can connect any handler to a Kafka topic, so have a look at the other use case pages for more inspiration.