> ## Documentation Index
> Fetch the complete documentation index at: https://docs.restate.dev/llms.txt
> Use this file to discover all available pages before exploring further.

> Invoke handlers via Kafka events.

# Kafka

Connect your Restate handlers to Kafka topics.
Restate takes care of Kafka consumer management and pushes events to your Restate handlers.

You get zero-overhead consumer management with automatic retries, durable execution, and stateful processing capabilities.

<Info>
  Each event leads to an invocation, meaning a request to execute a handler. Each invocation has its own unique ID and lifecycle.
  Have a look at [managing invocations](/services/invocation/managing-invocations) to learn how to manage the lifecycle of an invocation.
</Info>

## Invoking Handlers via Kafka Events

You can invoke handlers via Kafka events, by doing the following:

<Steps>
  <Step title="Develop and register an event handler">
    You can invoke any handler via Kafka events.
    The event payload will be (de)serialized as JSON.

    * When invoking **Virtual Object** or **Workflow** handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object/Workflow key.
      The key needs to be a valid UTF-8 string.
      The events are delivered to the subscribed handler in the order in which they arrived on the topic partition.
    * When invoking **Virtual Object** or **Workflow** *shared* handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object/Workflow key.
      The key needs to be a valid UTF-8 string.
      The events are delivered to the subscribed handler in parallel without ordering guarantees.
    * When invoking **Service** handlers over Kafka, events are delivered in parallel without ordering guarantees.

    Since you can invoke any handler via Kafka events, a single handler can be invoked both by RPC and via Kafka.
  </Step>

  <Step title="Register the Kafka cluster in Restate">
    Register the Kafka cluster that Restate needs to connect to, using the [Restate CLI](/installation):

    ```bash theme={null}
    restate kafka-clusters create my-cluster bootstrap.servers=broker:9092
    ```

    You can pass any [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) parameter as additional `key=value` arguments.
    Alternatively, you can read the properties from a file with `-f my-cluster.properties`, or open an editor on a properties template with `--edit`.

    <AccordionGroup>
      <Accordion title="Using SASL/SSL (e.g. Confluent Kafka)">
        To connect to a Kafka cluster that requires SASL/SSL authentication (e.g., Confluent Kafka), you can specify the necessary parameters when registering the cluster:

        ```bash theme={null}
        restate kafka-clusters create my-kafka \
            bootstrap.servers=my-kafka:9092 \
            security.protocol=SASL_SSL \
            sasl.mechanisms=PLAIN \
            sasl.username=user \
            sasl.password=pass \
            client.id=client-id
        ```

        For Confluent Cloud, you can copy the client configuration properties from the Confluent Cloud console into a properties file and register the cluster with it:

        ```bash theme={null}
        restate kafka-clusters create my-kafka -f confluent-cloud.properties
        ```
      </Accordion>

      <Accordion title="Using SASL OAuth2.0 / OpenID Connect">
        The Kafka ingress supports SASL OAUTHBEARER authentication, enabling OAuth 2.0/OpenID Connect (OIDC) token-based connections to managed Kafka services.

        Configure SASL OAUTHBEARER via the cluster properties. These options are passed directly to librdkafka.

        **Example for Confluent Cloud:**

        ```bash theme={null}
        restate kafka-clusters create my-cluster \
            bootstrap.servers=pkc-xxxxxx.eu-central-1.aws.confluent.cloud:9092 \
            security.protocol=SASL_SSL \
            sasl.mechanisms=OAUTHBEARER \
            sasl.oauthbearer.method=oidc \
            sasl.oauthbearer.client.id=<your-client-id> \
            sasl.oauthbearer.client.secret=<your-client-secret> \
            sasl.oauthbearer.token.endpoint.url=<your-token-endpoint> \
            sasl.oauthbearer.scope=kafka
        ```

        **Common OAUTHBEARER options:**

        | Option                                | Description                                  |
        | ------------------------------------- | -------------------------------------------- |
        | `security.protocol`                   | Set to `SASL_SSL` for encrypted connections  |
        | `sasl.mechanism`                      | Set to `OAUTHBEARER`                         |
        | `sasl.oauthbearer.method`             | Set to `oidc` for OIDC-based token retrieval |
        | `sasl.oauthbearer.client.id`          | OAuth client ID                              |
        | `sasl.oauthbearer.client.secret`      | OAuth client secret                          |
        | `sasl.oauthbearer.token.endpoint.url` | OAuth token endpoint URL                     |
        | `sasl.oauthbearer.scope`              | OAuth scope (if required by provider)        |

        For the full list of available options, see the [librdkafka CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Register the service you want to invoke." />

  <Step title="Subscribe the event handler to the Kafka topic">
    Let Restate forward events from the Kafka topic to the event handler by creating a subscription:

    ```bash theme={null}
    restate subscriptions create kafka://my-cluster/my-topic service://MyService/handle auto.offset.reset=earliest
    ```

    Once you've created a subscription, Restate immediately starts consuming events from Kafka.
    The handler will be invoked for each event received from Kafka.

    The trailing `key=value` options are optional and accept any configuration parameter from [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
  </Step>
</Steps>

<AccordionGroup>
  <Accordion title="Kafka connection configuration">
    You can pass arbitrary Kafka cluster properties when registering the cluster, and those properties will be applied for all the subscriptions to that cluster, for example:

    ```bash theme={null}
    restate kafka-clusters create my-cluster \
        bootstrap.servers=broker:9092 \
        sasl.username=me \
        sasl.password=pass
    ```

    For the full list of options, check [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
  </Accordion>

  <Accordion title="Multiple Kafka clusters support">
    You can register multiple Kafka clusters:

    ```bash theme={null}
    restate kafka-clusters create my-cluster-1 bootstrap.servers=localhost:9092
    restate kafka-clusters create my-cluster-2 bootstrap.servers=localhost:9093
    ```

    And then, when creating the subscriptions, you refer to the specific cluster by name:

    ```bash theme={null}
    # Subscription to my-cluster-1
    restate subscriptions create kafka://my-cluster-1/topic-1 service://MyService/handleCluster1

    # Subscription to my-cluster-2
    restate subscriptions create kafka://my-cluster-2/topic-2 service://MyService/handleCluster2
    ```
  </Accordion>

  <Accordion title="Event metadata">
    You can access the event metadata in the handler by getting the request headers map:

    <CodeGroup>
      ```ts TypeScript {"CODE_LOAD::ts/src/develop/kafka.ts#headers"}  theme={null}
      ctx.request().headers,
      ```

      ```java Java {"CODE_LOAD::java/src/main/java/develop/MyKafkaVirtualObject.java#headers"}  theme={null}
      ctx.request().headers();
      ```

      ```go Go {"CODE_LOAD::go/develop/kafka.go#headers"}  theme={null}
      ctx.Request().Headers
      ```

      ```python Python {"CODE_LOAD::python/src/develop/kafka.py#headers"}  theme={null}
      ctx.request().headers
      ```
    </CodeGroup>

    Each event carries within this map the following entries:

    * `restate.subscription.id`: The subscription identifier, as shown by `restate subscriptions list`.
    * `kafka.offset`: The record offset.
    * `kafka.partition`: The record partition.
    * `kafka.timestamp`: The record timestamp.
  </Accordion>

  <Accordion title="Raw event support">
    Check out the serialization documentation of your SDK to learn how to receive raw events in your handler.
  </Accordion>
</AccordionGroup>

## Managing Kafka Clusters

Manage the registered Kafka clusters with the `restate kafka-clusters` CLI commands (alias `kc`):

```bash theme={null}
# List the registered Kafka clusters
restate kafka-clusters list
# Print the properties of a Kafka cluster and its subscriptions
restate kafka-clusters describe my-cluster
# Open an editor to interactively update the cluster properties
restate kafka-clusters edit my-cluster
# Update the cluster properties non-interactively, for CI and scripting
restate kafka-clusters patch my-cluster --set client.id=restate-cli --unset sasl.password
# Remove a Kafka cluster
restate kafka-clusters delete my-cluster
```

## Managing Kafka Subscriptions

Manage the subscriptions with the `restate subscriptions` CLI commands (alias `sub`):

```bash theme={null}
# Subscribe a handler to a Kafka topic
restate subscriptions create kafka://my-cluster/my-topic service://MyService/Handle auto.offset.reset=earliest
# List current subscriptions
restate subscriptions list
# Print detailed information about a subscription, including its options
restate subscriptions describe sub_11XHoawrCiWtv8kzhEyGtsR
# Remove a subscription using its ID (starts with sub_)
restate subscriptions delete sub_11XHoawrCiWtv8kzhEyGtsR
```

When you delete a subscription, Restate stops the associated consumer group. Messages already enqueued by Restate will still be processed.
