> ## Documentation Index
> Fetch the complete documentation index at: https://docs.restate.dev/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.restate.dev/feedback

```json
{
  "path": "/guides/kafka-quickstart",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

# Restate-Kafka Quickstart

> Learn how to connect your Restate service to a Kafka topic.

In this guide, you will learn how to connect your Restate service handler to a Kafka topic.

Restate lets you connect any handler to a Kafka topic, and invoke the handler for each event that arrives on the topic.
This way, you can use Restate to process events in a lightweight, flexible, transactional way.

<Steps>
  <Step title={<span><a href={"/get_started/quickstart"}>Get the Greeter service template and run the service</a></span>}>
    You can choose any of the SDK languages for this quickstart.
    You do not need to adapt the handler code to be able to read from Kafka.
    Any handler can be connected to Kafka.
  </Step>

  <Step title="Start the Kafka cluster">
    Let's start a Kafka cluster with a single broker, and create a topic named `greetings`.

    You can run your Kafka cluster in your preferred way. Here, we will use Docker Compose to start a Kafka cluster.

    Create a `docker-compose.yaml` file with the following content:

    ```yaml docker-compose.yaml expandable theme={null}
    version: '3'
    services:
        broker:
            image: confluentinc/cp-kafka:7.5.0
            container_name: broker
            ports:
                - "9092:9092"
                - "9101:9101"
            environment:
                KAFKA_BROKER_ID: 1
                KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
                KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
                KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
                KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
                KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
                KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
                KAFKA_PROCESS_ROLES: broker,controller
                KAFKA_NODE_ID: 1
                KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
                KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
                KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
                KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
                KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
                CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

        init-kafka:
            image: confluentinc/cp-kafka:7.5.0
            depends_on:
                - broker
            entrypoint: [ '/bin/sh', '-c' ]
            command: |
                "# blocks until kafka is reachable
                kafka-topics --bootstrap-server broker:29092 --list
                echo -e 'Creating kafka topics'
                kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic greetings --replication-factor 1 --partitions 1

                echo -e 'Successfully created the following topics:'
                kafka-topics --bootstrap-server broker:29092 --list"
    ```

    Start the Kafka broker:

    ```shell theme={null}
    docker compose up
    ```
  </Step>

  <Step title="Running Restate Server">
    Now, let's start the Restate Server and let it know about the Kafka cluster via the configuration file.

    Store the following configuration in a file named `restate.toml`:

    ```toml restate.toml theme={null}
    [[ingress.kafka-clusters]]
    name = "my-cluster"
    brokers = ["PLAINTEXT://localhost:9092"]
    ```

    Start the Restate Server from the same location as the configuration file:

    ```shell theme={null}
    restate-server --config-file restate.toml
    ```
  </Step>

  <Step title="Register the service">
    Let the Restate Server know about the Greeter service by registering it:

    ```shell theme={null}
    restate deployments register localhost:9080
    ```
  </Step>

  <Step title="Connect the handler to the topic">
    Now, we need to make Restate subscribe to the Kafka topics and tell it where it should push the events that arrive on the topic.

    Execute the following curl command to create a subscription, and invoke the handler for each event:

    ```shell theme={null}
    curl localhost:9070/subscriptions --json '{
        "source": "kafka://my-cluster/greetings",
        "sink": "service://Greeter/greet",
        "options": {"auto.offset.reset": "earliest"}
    }'
    ```

    For Go, you need to capitalize the handler name: `service://Greeter/Greet`.

    This curl command calls the Admin API of the Restate Server and tells it to invoke the `greet` handler of the `Greeter` service for each event that arrives on the `greetings` topic in the `my-cluster` Kafka cluster.
  </Step>

  <Step title="Invoke the handler by publishing an event">
    Create a Kafka producer and publish an event to the `greetings` topic:

    ```shell theme={null}
    docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic greetings
    ```

    Then type a message and press enter. The greeter takes a String name as an input:

    ```
    {"name":"Sarah"}
    ```

    For some greeter templates, you might need to send just the name as `"Sarah"`.
  </Step>

  <Step stepLabel="🥳" title="We invoked a Restate service over Kafka">
    You now see your handler getting invoked.

    The way this worked is that Restate reads the message off the Kafka topic and durably persisted it, similar to what it does for HTTP invocations.

    It then **pushed** the message to the handler, as opposed to the pull mechanism that you would have with a Kafka consumer.

    If the handler fails, Restate will retry the request until it eventually succeeds.

    When invoking Basic Services, Restates ignores the key of the message.

    When invoking Virtual Objects, Restate uses the key of the Kafka message as the Virtual Object key.
    Whereas a Kafka partition contains multiple keys, Restate effectively keeps track of a queue per key.

    Have a look at the [Event Processing use case page](/use-cases/event-processing) to learn about what you can do with Restate and event processing.
  </Step>

  <Step title="Cleanup: removing the subscription">
    You can see the subscriptions that are active via the Admin API:

    ```shell theme={null}
    curl localhost:9070/subscriptions
    ```

    Example output:

    ```json theme={null}
    {
        "subscriptions": [
            {
                "id": "sub_11XHoawrCiWtv8kzhEyGtsR",
                "source": "kafka://my-cluster/my-topic",
                "sink": "service://Greeter/greet",
                "options": {
                    "auto.offset.reset": "earliest",
                    "client.id": "restate",
                    "group.id": "sub_11XHoawrCiWtv8kzhEyGtsR"
                }
            }
        ]
    }
    ```

    As you can see, subscriptions have an ID that starts with `sub_`.

    Now you can use the subscription ID to delete the subscription:

    ```shell theme={null}
    curl -X DELETE localhost:9070/subscriptions/sub_11XHoawrCiWtv8kzhEyGtsR
    ```
  </Step>
</Steps>

## Related resources

* [Event processing use cases](/use-cases/event-processing): Have a look at examples of how Restate gives you lightweight, transactional event processing.
* Have a look at [the examples](https://github.com/restatedev/examples). There, you can find examples on Durable Execution and event processing.
* [Reference docs for invoking over Kafka](/services/invocation/kafka)
* [Blog post on Kafka integration](https://restate.dev/blog/restate--kafka-event-driven-apps-where-event-driven-is-an-implementation-detail/)
