> ## Documentation Index
> Fetch the complete documentation index at: https://docs.restate.dev/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.restate.dev/feedback

```json
{
  "path": "/use-cases/event-processing",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

# Event Processing

> Build lightweight, transactional event handlers with built-in resiliency.

Kafka event processing requires managing Kafka consumers, handling retries, maintaining state stores, and coordinating complex workflows. Restate eliminates this complexity by providing **lightweight, transactional event processing** with zero consumer management and built-in state.

## Workflows from Kafka

Build event handlers with complex control flow, loops, timers, and transactional guarantees:

<CodeGroup>
  ```typescript TypeScript {"CODE_LOAD::ts/src/usecases/eventprocessing/user-feed.ts#here"}  theme={null}
  export default restate.object({
    name: "userFeed",
    handlers: {
      processPost: async (ctx: restate.ObjectContext, post: SocialMediaPost) => {
        const userId = ctx.key;

        // Durable side effect: persisted and replayed on retries
        const postId = await ctx.run(() => createPost(userId, post));

        // Wait for processing to complete with durable timers
        while ((await ctx.run(() => getPostStatus(postId))) === PENDING) {
          await ctx.sleep({ seconds: 5 });
        }

        await ctx.run(() => updateUserFeed(userId, postId));
      },
    },
  });
  ```

  ```java Java {"CODE_LOAD::java/src/main/java/usecases/eventprocessing/eventtransactions/UserFeed.java#here"}  theme={null}
  @VirtualObject
  public class UserFeed {
    @Handler
    public void processPost(ObjectContext ctx, SocialMediaPost post) {
      String userId = ctx.key();

      String postId = ctx.run(String.class, () -> createPost(userId, post));

      while (ctx.run(String.class, () -> getPostStatus(postId)).equals("PENDING")) {
        ctx.sleep(Duration.ofSeconds(5));
      }

      ctx.run(() -> updateUserFeed(userId, postId));
    }
  }
  ```

  ```python Python {"CODE_LOAD::python/src/usecases/eventprocessing/user_feed.py#here"}  theme={null}
  user_feed = restate.VirtualObject("UserFeed")


  @user_feed.handler()
  async def process_post(ctx: restate.ObjectContext, post: SocialMediaPost):
      user_id = ctx.key()

      post_id = await ctx.run_typed(
          "create post", create_post, user_id=user_id, post=post
      )

      while (
          await ctx.run_typed("get status", get_post_status, post_id=post_id)
          == Status.PENDING
      ):
          await ctx.sleep(timedelta(seconds=5))

      await ctx.run_typed("update feed", update_user_feed, user=user_id, post_id=post_id)
  ```

  ```go Go {"CODE_LOAD::go/usecases/eventprocessing/userfeed.go#here"}  theme={null}
  type UserFeed struct{}

  func (UserFeed) ProcessPost(ctx restate.ObjectContext, post SocialMediaPost) error {
    var userId = restate.Key(ctx)

    postId, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {
      return CreatePost(userId, post)
    })
    if err != nil {
      return err
    }

    for {
      status, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {
        return GetPostStatus(postId), nil
      })
      if err != nil {
        return err
      }
      if status != PENDING {
        break
      }
      if err = restate.Sleep(ctx, 5*time.Second); err != nil {
        // </mark_2>
        return err
      }
    }

    if _, err := restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {
      return UpdateUserFeed(userId, postId)
    }); err != nil {
      return err
    }

    return nil
  }
  ```
</CodeGroup>

**Key Benefits:**

* **Push-based delivery**: Events delivered directly to handlers with zero consumer management
* **Durable execution**: Failed handlers are retried with exponential backoff until they succeed. Handlers replay previously completed steps and resume exactly where they left off.
* **Parallel processing**: Events for different keys process concurrently, like a queue per object key
* **Timers and scheduling**: Timers and delays that survive crashes and restarts for **long-running workflows**

## Stateful Event Handlers

Maintain K/V state across events:

<CodeGroup>
  ```typescript TypeScript {"CODE_LOAD::ts/src/usecases/eventprocessing/delivery-tracker.ts#here"}  theme={null}
  export default restate.object({
    name: "delivery-tracker",
    handlers: {
      register: async (ctx: restate.ObjectContext, delivery: Delivery) =>
        ctx.set("delivery", delivery),

      setLocation: async (ctx: restate.ObjectContext, location: Location) => {
        const delivery = await ctx.get<Delivery>("delivery");
        if (!delivery) {
          throw new TerminalError(`Delivery not found`);
        }

        delivery.locations.push(location);
        ctx.set("delivery", delivery);
      },

      getDelivery: shared(async (ctx: restate.ObjectSharedContext) =>
        ctx.get<Delivery>("delivery")
      ),
    },
  });
  ```

  ```java Java {"CODE_LOAD::java/src/main/java/usecases/eventprocessing/eventenrichment/DeliveryTracker.java#here"}  theme={null}
  @VirtualObject
  public class DeliveryTracker {
    private static final StateKey<Delivery> DELIVERY = StateKey.of("delivery", Delivery.class);

    @Handler
    public void register(ObjectContext ctx, Delivery packageInfo) {
      ctx.set(DELIVERY, packageInfo);
    }

    @Handler
    public void setLocation(ObjectContext ctx, Location location) {
      var delivery = ctx.get(DELIVERY).orElseThrow(() -> new TerminalException("Delivery not found"));

      delivery.addLocation(location);
      ctx.set(DELIVERY, delivery);
    }

    @Shared
    public Delivery getDelivery(SharedObjectContext ctx) {
      return ctx.get(DELIVERY).orElseThrow(() -> new TerminalException("Delivery not found"));
    }
  }
  ```

  ```python Python {"CODE_LOAD::python/src/usecases/eventprocessing/delivery_tracker.py#here"}  theme={null}
  delivery_tracker = restate.VirtualObject("DeliveryTracker")


  @delivery_tracker.handler()
  async def register(ctx: restate.ObjectContext, delivery: Delivery):
      ctx.set("delivery", delivery)


  @delivery_tracker.handler()
  async def set_location(ctx: restate.ObjectContext, location: Location):
      delivery = await ctx.get("delivery", type_hint=Delivery)
      if delivery is None:
          raise TerminalError(f"Delivery {ctx.key()} not found")

      delivery.locations.append(location)
      ctx.set("delivery", delivery)


  @delivery_tracker.handler(kind="shared")
  async def get_delivery(ctx: restate.ObjectSharedContext) -> Delivery:
      delivery = await ctx.get("delivery", type_hint=Delivery)
      if delivery is None:
          raise TerminalError(f"Delivery {ctx.key()} not found")
      return delivery
  ```

  ```go Go {"CODE_LOAD::go/usecases/eventprocessing/packagetracker.go#here"}  theme={null}
  type DeliveryTracker struct{}

  func (DeliveryTracker) Register(ctx restate.ObjectContext, delivery Delivery) error {
    restate.Set[Delivery](ctx, "delivery", delivery)
    return nil
  }

  func (DeliveryTracker) SetLocation(ctx restate.ObjectContext, location Location) error {
    packageInfo, err := restate.Get[*Delivery](ctx, "delivery")
    if err != nil {
      return err
    }
    if packageInfo == nil {
      return restate.TerminalError(errors.New("delivery not found"))
    }

    packageInfo.Locations = append(packageInfo.Locations, location)
    restate.Set[Delivery](ctx, "delivery", *packageInfo)
    return nil
  }

  func (DeliveryTracker) GetDelivery(ctx restate.ObjectSharedContext) (*Delivery, error) {
    return restate.Get[*Delivery](ctx, "delivery")
  }
  ```
</CodeGroup>

**Key Benefits**:

* **Persistent state**: Store and retrieve state directly in handlers without external stores
* **Built-in consistency**: State operations are always consistent with execution
* **Agents, actors, digital twins**: Model stateful entities that react to events

## When to Choose Restate

**✅ Choose Restate when you need:**

* **Kafka integration**: Process Kafka events with zero consumer management
* **Reliable processing**: Automatic retry and recovery for failed event handlers
* **Transactional processing**: Execute side effects with durable execution guarantees
* **Stateful event processing**: Maintain state across events without external stores
* **Event-driven workflows**: Build complex flows with loops, timers, and conditions

<Info>
  Processing events with Restate? Join our community on [Discord](https://discord.restate.dev) or [Slack](https://slack.restate.dev) to discuss your use case.
</Info>

## Comparison with Other Solutions

| Feature                 | Restate                                                                  | Traditional Kafka Processing                                        | Stream Processing Frameworks                                       |
| ----------------------- | ------------------------------------------------------------------------ | ------------------------------------------------------------------- | ------------------------------------------------------------------ |
| **Event Delivery**      | Push-based to durable handlers                                           | Polling-based consumer groups                                       | Built-in sources and sinks                                         |
| **Consumer Management** | Zero configuration                                                       | Manual offset and consumer group management                         | Zero configuration                                                 |
| **Failure Recovery**    | Fine-grained progress persistence with durable side effects              | Coarse-grained offset commits with potential reprocessing           | Coarse-grained checkpointing with potential duplicate side effects |
| **State Management**    | Built-in durable state                                                   | External state store required                                       | Built-in state store                                               |
| **Queuing Semantics**   | Queue-per-key with ordering guarantees                                   | Queue-per-partition with ordering guarantees                        | Queue-per-partition (inherited from Kafka)                         |
| **Complex Workflows**   | Unlimited control flow: loops, timers, conditions                        | Long-running logic blocks consumer loop; requires external services | DAG-based processing with limited control flow                     |
| **Best For**            | Event-driven state machines, transactional processing, complex workflows | Simple ETL and message processing                                   | High-throughput analytics, aggregations, joins                     |

## Getting Started

Ready to build event processing systems with Restate? Here are your next steps:

<CardGroup cols={3}>
  <Card title="Quickstart" icon="rocket" href="/quickstart">
    Set up Restate and process your first events
  </Card>

  <Card title="Kafka Quickstart" icon="plug" href="/guides/kafka-quickstart">
    Follow the quickstart to implement your first durable event handler
  </Card>

  <Card title="Examples" icon="code" href="https://github.com/restatedev/examples">
    Explore templates, patterns, and end-to-end applications
  </Card>
</CardGroup>

<Info>
  Evaluating Restate and missing a feature? Contact us on [Discord](https://discord.restate.dev) or [Slack](https://slack.restate.dev).
</Info>
