Skip to main content

Kafka

You can invoke handlers via Kafka events, by doing the following:

info

Make sure to first register the handler you want to invoke.

1
Develop an event handler

You can invoke any handler via Kafka events. The event payload will be (de)serialized as JSON.

  • When invoking Virtual Object handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object key. The key needs to be a valid UTF-8 string. For each Virtual Object, the events are delivered in the order in which they arrived on the topic partition.
  • When invoking Service handlers over Kafka, events are delivered in parallel without ordering guarantees.
Combining RPC and Kafka within a service/handler

Since you can invoke any handler via Kafka events, a single handler can be invoked both by RPC and via Kafka.

2
Configure Restate to connect to a Kafka cluster

Define the Kafka cluster that Restate needs to connect to in the Restate configuration file:

restate.toml

[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://broker:9092"]

And make sure the Restate Server uses it via restate-server --config-file restate.toml.

Check the configuration docs for more details.

3
Register the service you want to invoke.

4
Subscribe the event handler to the Kafka topic

Let Restate forward events from the Kafka topic to the event handler by creating a subscription using the Admin API:


curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/my-topic",
"sink": "service://MyService/handle",
"options": {"auto.offset.reset": "earliest"}
}'

Once you've created a subscription, Restate immediately starts consuming events from Kafka. The handler will be invoked for each event received from Kafka.

The options field is optional and accepts any configuration parameter from librdkafka configuration.

Managing subscriptions

Have a look at the invocation docs for more commands to manage subscriptions.

Event metadata

Each event carries within the ctx.request().headers map the following entries:

  • restate.subscription.id: The subscription identifier, as shown by the Admin API.
  • kafka.offset: The record offset.
  • kafka.partition: The record partition.
  • kafka.timestamp: The record timestamp.