> ## Documentation Index
> Fetch the complete documentation index at: https://docs.restate.dev/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.restate.dev/feedback

```json
{
  "path": "/services/invocation/kafka",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

> Invoke handlers via Kafka events.

# Kafka

Connect your Restate handlers to Kafka topics.
Restate takes care of Kafka consumer management and pushes events to your Restate handlers.

You get zero-overhead consumer management with automatic retries, durable execution, and stateful processing capabilities.

<Info>
  Each event leads to an invocation, meaning a request to execute a handler. Each invocation has its own unique ID and lifecycle.
  Have a look at [managing invocations](/services/invocation/managing-invocations) to learn how to manage the lifecycle of an invocation.
</Info>

## Invoking Handlers via Kafka Events

You can invoke handlers via Kafka events, by doing the following:

<Steps>
  <Step title="Develop and register an event handler">
    You can invoke any handler via Kafka events.
    The event payload will be (de)serialized as JSON.

    * When invoking **Virtual Object** or **Workflow** handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object/Workflow key.
      The key needs to be a valid UTF-8 string.
      The events are delivered to the subscribed handler in the order in which they arrived on the topic partition.
    * When invoking **Virtual Object** or **Workflow** *shared* handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object/Workflow key.
      The key needs to be a valid UTF-8 string.
      The events are delivered to the subscribed handler in parallel without ordering guarantees.
    * When invoking **Service** handlers over Kafka, events are delivered in parallel without ordering guarantees.

    Since you can invoke any handler via Kafka events, a single handler can be invoked both by RPC and via Kafka.
  </Step>

  <Step title="Configure Restate to connect to a Kafka cluster">
    Define the Kafka cluster that Restate needs to connect to in the [Restate configuration file](/server/configuration#configuration-file):

    ```toml restate.toml theme={null}
    [[ingress.kafka-clusters]]
    name = "my-cluster"
    brokers = ["PLAINTEXT://broker:9092"]
    ```

    And make sure the Restate Server uses it via `restate-server --config-file restate.toml`.

    Check the [configuration docs](/server/configuration) for more details.

    <AccordionGroup>
      <Accordion title="Using SASL/SSL (e.g. Confluent Kafka)">
        To connect to a Kafka cluster that requires SASL/SSL authentication (e.g., Confluent Kafka), you can specify the necessary parameters in the `restate.toml` file:

        ```toml restate.toml theme={null}
        [[ingress.kafka-clusters]]
        name = "my-kafka"
        brokers = [ "my-kafka:9092" ]
        "security.protocol" = "SASL_SSL"
        "sasl.mechanisms" = "PLAIN"
        "sasl.username" = "user"
        "sasl.password" = "pass"

        "client.id" = "client-id"
        ```

        Note the quotation marks around the configuration keys.

        For Confluent Cloud, the rest of the configuration can be copied from the Confluent Cloud "Rust client" configuration.
      </Accordion>

      <Accordion title="Using SASL OAuth2.0 / OpenID Connect">
        The Kafka ingress supports SASL OAUTHBEARER authentication, enabling OAuth 2.0/OpenID Connect (OIDC) token-based connections to managed Kafka services.

        Configure SASL OAUTHBEARER via the `additional_options` field in your Kafka cluster configuration. These options are passed directly to librdkafka.

        **Example for Confluent Cloud:**

        ```toml theme={null}
        [[ingress.kafka-clusters]]
        name = "my-cluster"
        brokers = ["SASL_SSL://pkc-xxxxxx.eu-central-1.aws.confluent.cloud:9092"]
        "security.protocol"="SASL_SSL"
        "sasl.mechanisms"="OAUTHBEARER"
        "sasl.oauthbearer.method"="oidc"
        "sasl.oauthbearer.client.id"="<your-client-id>"
        "sasl.oauthbearer.client.secret"="<your-client-secret>"
        "sasl.oauthbearer.token.endpoint.url"="<your-token-endpoint>"
        "sasl.oauthbearer.scope"="kafka"
        ```

        **Common OAUTHBEARER options:**

        | Option                                | Description                                  |
        | ------------------------------------- | -------------------------------------------- |
        | `security.protocol`                   | Set to `SASL_SSL` for encrypted connections  |
        | `sasl.mechanism`                      | Set to `OAUTHBEARER`                         |
        | `sasl.oauthbearer.method`             | Set to `oidc` for OIDC-based token retrieval |
        | `sasl.oauthbearer.client.id`          | OAuth client ID                              |
        | `sasl.oauthbearer.client.secret`      | OAuth client secret                          |
        | `sasl.oauthbearer.token.endpoint.url` | OAuth token endpoint URL                     |
        | `sasl.oauthbearer.scope`              | OAuth scope (if required by provider)        |

        For the full list of available options, see the [librdkafka CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
      </Accordion>

      <Accordion title="Configuring Kafka clusters via environment variables">
        You can also configure the Kafka clusters via the `RESTATE_INGRESS__KAFKA_CLUSTERS` environment variable:

        ```bash theme={null}
        RESTATE_INGRESS__KAFKA_CLUSTERS=[{name="my-cluster",brokers=["PLAINTEXT://broker:9092"]}]
        ```
      </Accordion>

      <Accordion title="Experimental: Improved Kafka batch ingestion">
        In certain scenarios, such as consuming from a Kafka topic with few partitions, the new batch ingestion can significantly improve ingestion throughput compared to the legacy implementation.

        **This feature is disabled by default and should be used with caution.**

        All nodes in the cluster must be running Restate v1.6 before enabling this feature.

        **Once enabled and data has been ingested, you cannot roll back to a version prior to Restatev1.6.**

        To enable the experimental Kafka batch ingestion, set the following environment variable on all nodes:

        ```bash theme={null}
        RESTATE_EXPERIMENTAL_KAFKA_BATCH_INGESTION=true
        ```

        Or in your configuration file:

        ```toml theme={null}
        experimental-kafka-batch-ingestion = true
        ```

        Contact us on [Discord](https://discord.restate.dev) or [Slack](https://slack.restate.dev) to test it together with us.
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Register the service you want to invoke." />

  <Step title="Subscribe the event handler to the Kafka topic">
    Let Restate forward events from the Kafka topic to the event handler by creating a subscription:

    ```bash theme={null}
    curl localhost:9070/subscriptions --json '{
        "source": "kafka://my-cluster/my-topic",
        "sink": "service://MyService/handle",
        "options": {"auto.offset.reset": "earliest"}
    }'
    ```

    Once you've created a subscription, Restate immediately starts consuming events from Kafka.
    The handler will be invoked for each event received from Kafka.

    The `options` field is optional and accepts any configuration parameter from [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
  </Step>
</Steps>

<AccordionGroup>
  <Accordion title="Kafka connection configuration">
    You can pass arbitrary Kafka cluster options in the `restate.toml`, and those options will be applied for all the subscriptions to that cluster, for example:

    ```toml restate.toml theme={null}
    [[ingress.kafka-clusters]]
    name = "my-cluster"
    brokers = ["PLAINTEXT://broker:9092"]
    "sasl.username" = "me"
    "sasl.password" = "pass"
    ```

    For the full list of options, check [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
  </Accordion>

  <Accordion title="Multiple Kafka clusters support">
    You can configure multiple kafka clusters in the `restate.toml` file:

    ```toml restate.toml theme={null}
    [[ingress.kafka-clusters]]
    name = "my-cluster-1"
    brokers = ["PLAINTEXT://localhost:9092"]

    [[ingress.kafka-clusters]]
    name = "my-cluster-2"
    brokers = ["PLAINTEXT://localhost:9093"]
    ```

    And then, when creating the subscriptions, you refer to the specific cluster by `name`:

    ```bash theme={null}
    # Subscription to my-cluster-1
    curl localhost:9070/subscriptions --json '{
        "source": "kafka://my-cluster-1/topic-1",
        "sink": "service://MyService/handleCluster1"
    }'

    # Subscription to my-cluster-2
    curl localhost:9070/subscriptions --json '{
        "source": "kafka://my-cluster-2/topic-2",
        "sink": "service://MyService/handleCluster2"
    }'
    ```
  </Accordion>

  <Accordion title="Event metadata">
    You can access the event metadata in the handler by getting the request headers map:

    <CodeGroup>
      ```ts TypeScript {"CODE_LOAD::ts/src/develop/kafka.ts#headers"}  theme={null}
      ctx.request().headers,
      ```

      ```java Java {"CODE_LOAD::java/src/main/java/develop/MyKafkaVirtualObject.java#headers"}  theme={null}
      ctx.request().headers();
      ```

      ```go Go {"CODE_LOAD::go/develop/kafka.go#headers"}  theme={null}
      ctx.Request().Headers
      ```

      ```python Python {"CODE_LOAD::python/src/develop/kafka.py#headers"}  theme={null}
      ctx.request().headers
      ```
    </CodeGroup>

    Each event carries within this map the following entries:

    * `restate.subscription.id`: The subscription identifier, as shown by the Admin API.
    * `kafka.offset`: The record offset.
    * `kafka.partition`: The record partition.
    * `kafka.timestamp`: The record timestamp.
  </Accordion>

  <Accordion title="Raw event support">
    Check out the serialization documentation of your SDK to learn how to receive raw events in your handler.
  </Accordion>
</AccordionGroup>

## Managing Kafka Subscriptions

Restate can trigger handlers via Kafka events.

### Create Subscriptions

Subscribe a handler to a Kafka topic:

```bash theme={null}
curl localhost:9070/subscriptions --json '{
    "source": "kafka://my-cluster/my-topic",
    "sink": "service://MyService/Handle",
    "options": {"auto.offset.reset": "earliest"}
}'
```

The `options` field is optional and accepts any [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) parameter.

### List Subscriptions

View current subscriptions:

```bash theme={null}
curl localhost:9070/subscriptions
```

**Example response:**

```json theme={null}
{
    "subscriptions": [
        {
            "id": "sub_11XHoawrCiWtv8kzhEyGtsR",
            "source": "kafka://my-cluster/my-topic",
            "sink": "service://Greeter/greet",
            "options": {
                "auto.offset.reset": "earliest",
                "client.id": "restate",
                "group.id": "sub_11XHoawrCiWtv8kzhEyGtsR"
            }
        }
    ]
}
```

### Delete Subscriptions

Remove a subscription using its ID (starts with `sub_`):

```bash theme={null}
curl -X DELETE localhost:9070/subscriptions/sub_11XHoawrCiWtv8kzhEyGtsR
```

When you delete a subscription, Restate stops the associated consumer group. Messages already enqueued by Restate will still be processed.
