Consume Apache Kafka® messages via REST APIs
- 3 minutes read - 613 wordsYou have an Apache Kafka topic that you want to consume via REST APIs (think curl
or just a web browser), how to do it?
The reply is Karapace an Open Source (Apache 2.0) tool providing Schema Registry and REST proxy functionality.
Let’s check out how to quickly run it on Docker.
Step 1: the Apache Kafka® Cluster
You need an Apache Kafka cluster up & running, if you have one already you can skip to the next section.
The Karapace GitHub repo contains a Docker compose version of it. Alternatively you can spin a Kafka cluster up in minutes with Aiven by:
- Navigating to the Aiven Console
- Selecting a new Apache Kafka service
- Selecting the cloud provider and region of your preference
- Selecting the plan
- Providing the service name
- Clicking on Create service
After a couple minutes the Apache Kafka® cluster will be up&running.
Step 2: the connection parameters
In order to connect to Apache Kafka, you’ll need:
- the bootstrap URI information
- the security information (it being
PLAINTEXT
,SSL
orSASL
) with the required certificates
If you are using Aiven, you can download the three certificates (ca.pem
, service.crt
, service.key
) in the Aiven Console, in the Kafka service overview.
Create a folder named certs
and place the certificates there.
Step 3: Run Karapace REST proxy in Docker
Once you collected the required information and certificates, it’s time to run the REST proxy part of Karapace on Docker with:
docker run -it \
-e KARAPACE_BOOTSTRAP_URI='<KAFKA_HOST>:<KAFKA_PORT>' \
-e KARAPACE_SSL_CAFILE="/certs/ca.pem" \
-e KARAPACE_SSL_CERTFILE="/certs/service.crt" \
-e KARAPACE_SSL_KEYFILE="/certs/service.key" \
-e KARAPACE_SECURITY_PROTOCOL="SSL" \
-e KARAPACE_HOST='0.0.0.0' \
-e KARAPACE_PORT=8082 \
-v $(pwd)/certs:/certs \
-p 8082:8082 \
ghcr.io/aiven/karapace:latest ./start.sh rest
The command above is:
- Running the
latest
Karapace image - Starting the
rest
part of Karapace with the./start.sh
command - Passing the parameters:
KARAPACE_BOOTSTRAP_URI
: for the Kafka bootstrap URI, you will need to replace the placeholdersKARAPACE_SECURITY_PROTOCOL
: the protocol to use to connect to Kafka,SSL
in this exampleKARAPACE_SSL_*
: the list of certificates used for theSSL
connectionKARAPACE_HOST
andKARAPACE_PORT
: the host and port used to start Karapace in the Docker container-v $(pwd)/certs:/certs
: mapping thecerts
folder created locally to/certs
in the Docker container-p 8082:8082
mapping the port8082
to the Docker port8082
If the above command is successful you should see the below message appearing
======== Running on http://0.0.0.0:8082 ========
And should be able to execute
curl http://0.0.0.0:8082
Returning
[]
Step 3: Consume Apache Kafka® messages via REST APIs
The last step is to consume messages via REST APIs.
We can list the topics in Kafka with:
curl "http://localhost:8082/topics"
In my test example the response is only a test
topic:
["test"]
Next we create a consumer my_consumer
with:
Create JSON consumer
curl -X POST \
-H "Content-Type: application/vnd.kafka.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/json_consumers
Now we subscribe the my_consumer
to test
topic with:
curl -X POST \
-H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["test"]}' \
http://localhost:8082/consumers/json_consumers/instances/my_consumer/subscription
Finally we consume the data from the test
topic with:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/json_consumers/instances/my_consumer/records?timeout=1000
The output will showcase the existing data in the topic like
[
{
"key": {
"id": 1
},
"offset": 0,
"partition": 0,
"timestamp": 1693386610137,
"topic": "test",
"value": {
"id": 1,
"name": "francesco"
}
},
{
"key": {
"id": 1
},
"offset": 1,
"partition": 0,
"timestamp": 1693387327332,
"topic": "test",
"value": {
"id": 2
}
}
]
Karapace will sync the consumer offset to a topic in Kafka named __consumer_offsets
, therefore the next time we issue the consume command, we’ll see only the messages appeared in the Kafka topic since the last poll (the output will be empty if no new messages are in the topic).