The DC/OS Apache Kafka Service implements a REST API that may be accessed from outside the cluster. The <dcos_url>
parameter referenced below indicates the base URL of the DC/OS cluster on which the DC/OS Apache Kafka Service is deployed.
REST API Authentication
REST API requests must be authenticated. This authentication is only applicable for interacting with the Apache Kafka REST API directly. You do not need the token to access the Apache Kafka nodes themselves.
If you are using Enterprise DC/OS, follow these instructions to create a service account and an authentication token. You can then configure your service to automatically refresh the authentication token when it expires. To get started more quickly, you can also get the authentication token without a service account, but you will need to manually refresh the token.
If you are using open source DC/OS, follow these instructions to pass your HTTP API token to the DC/OS endpoint.
Once you have the authentication token, you can store it in an environment variable and reference it in your REST API calls:
export auth_token=uSeR_t0k3n
The curl
examples in this document assume that an auth token has been stored in an environment variable named auth_token
.
If you are using Enterprise DC/OS, the security mode of your installation may also require the --ca-cert
flag when making REST calls. Refer to Obtaining and passing the DC/OS certificate in cURL requests for information on how to use the --cacert
flag. If your security mode is disabled
, do not use the --ca-cert
flag.
Plan API
The Plan API provides endpoints for monitoring and controlling service installation and configuration updates.
List plans
You may list the configured plans for the service. By default, all services at least have a deploy
plan and a recovery
plan. Some services may have additional custom plans defined.
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans
dcos kafka --name=kafka plan list
View plan
You may view the current state of a listed plan:
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/<plan>
The CLI may be used to show a formatted tree of the plan (default), or the underlying JSON data as retrieved from the above HTTP endpoint:
dcos kafka --name=kafka plan show <plan>
dcos kafka --name=kafka plan show <plan> --json
Pause plan
The installation will pause after completing the operation of the current node and wait for user input before proceeding further.
curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/deploy/interrupt
dcos kafka --name=kafka plan pause deploy
Resume plan
The REST API request below will resume the operation at the next pending node.
curl -X PUT -H "Authorization:token=$auth_token" <dcos_surl>/service/kafka/v1/plans/deploy/continue
dcos kafka --name=kafka plan continue deploy
Nodes API
The pod API provides endpoints for retrieving information about nodes, restarting them, and replacing them.
List Nodes
A list of available node ids can be retrieved by sending a GET request to /v1/pod
:
CLI Example
dcos kafka --name=kafka pod list
HTTP Example
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod
Node Info
You can retrieve node information by sending a GET request to /v1/pod/<node-id>/info
:
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/info
CLI Example
dcos kafka --name=kafka pod info journalnode-0
HTTP Example
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/journalnode-0/info
Replace a Node
The replace endpoint
command can be used to replace a node with an instance running on another agent node.
CLI Example
dcos kafka --name=kafka pod replace <node-id>
HTTP Example
curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/replace
If the operation succeeds, a 200 OK
is returned.
Restart a Node
The restart endpoint
command can be used to restart a node in place on the same agent node.
CLI Example
dcos kafka --name=kafka pod restart <node-id>
HTTP Example
curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/restart
If the operation succeeds a 200 OK
is returned.
Pause a Node
The pause endpoint
command can be used to relaunch a node in an idle command state for debugging purposes.
CLI example
dcos kafka --name=kafka debug pod pause <node-id>
HTTP Example
curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/pause
Configuration API
The configuration API provides an endpoint to view current and previous configurations of the cluster.
View Target Config
You can view the current target configuration by sending a GET request to /v1/configurations/target
.
CLI Example
dcos kafka --name=kafka config target
HTTP Example
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/target
List Configs
You can list all configuration IDs by sending a GET request to /v1/configurations
.
CLI Example
dcos kafka --name=kafka config list
[
"319ebe89-42e2-40e2-9169-8568e2421023",
"294235f2-8504-4194-b43d-664443f2132b"
]
HTTP Example
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations
[
"319ebe89-42e2-40e2-9169-8568e2421023",
"294235f2-8504-4194-b43d-664443f2132b"
]
View Specified Config
You can view a specific configuration by sending a GET request to /v1/configurations/<config-id>
.
CLI Example
dcos kafka --name=kafka config show 9a8d4308-ab9d-4121-b460-696ec3368ad6
HTTP Example
curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/9a8d4308-ab9d-4121-b460-696ec3368ad6
Topic Operations
These operations mirror what is available with bin/kafka-topics.sh
.
List Topics
dcos kafka --name=kafka topic list
[
"topic1",
"topic0"
]
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics"
[
"topic1",
"topic0"
]
Describe Topic
dcos kafka --name=kafka topic describe topic1
{
"partitions": [
{
"0": {
"controller_epoch": 1,
"isr": [
0,
1,
2
],
"leader": 0,
"leader_epoch": 0,
"version": 1
}
},
{
"1": {
"controller_epoch": 1,
"isr": [
1,
2,
0
],
"leader": 1,
"leader_epoch": 0,
"version": 1
}
},
{
"2": {
"controller_epoch": 1,
"isr": [
2,
0,
1
],
"leader": 2,
"leader_epoch": 0,
"version": 1
}
}
]
}
curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
"partitions": [
{
"0": {
"controller_epoch": 1,
"isr": [
0,
1,
2
],
"leader": 0,
"leader_epoch": 0,
"version": 1
}
},
{
"1": {
"controller_epoch": 1,
"isr": [
1,
2,
0
],
"leader": 1,
"leader_epoch": 0,
"version": 1
}
},
{
"2": {
"controller_epoch": 1,
"isr": [
2,
0,
1
],
"leader": 2,
"leader_epoch": 0,
"version": 1
}
}
]
}
Create Topic
dcos kafka --name=kafka topic create topic1 --partitions=3 --replication=3
{
"message": "Output: Created topic \"topic1\"\n"
}
curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?partitions=3&replication=3"
{
"message": "Output: Created topic \"topic1\"\n"
}
View Topic Offsets
There is an optional --time
parameter which may be set to either “first”, “last”, or a timestamp in milliseconds as described in the Kafka documentation.
dcos kafka --name=kafka topic offsets topic1 --time=last
[
{
"2": "334"
},
{
"1": "333"
},
{
"0": "333"
}
]
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/offsets?time=-1"
[
{
"2": "334"
},
{
"1": "333"
},
{
"0": "333"
}
]
Alter Topic Partition Count
dcos kafka --name=kafka topic partitions topic1 2
{
"message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=partitions&partitions=2"
{
"message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
Run Producer Test on Topic
dcos kafka --name=kafka topic producer_test topic1 10
{
"message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=producer-test&messages=10"
{
"message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
The above commands run the equivalent of the following command from the machine running the Kafka Scheduler:
kafka-producer-perf-test.sh \
--topic <topic> \
--num-records <messages> \
--throughput 100000 \
--record-size 1024 \
--producer-props bootstrap.servers=<current broker endpoints>
Delete Topic
dcos kafka --name=kafka topic delete topic1
{
"message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
curl -X DELETE -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
"message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
List Under Replicated Partitions
dcos kafka --name=kafka topic under_replicated_partitions
{
"message": ""
}
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/under_replicated_partitions"
{
"message": ""
}
List Unavailable Partitions
dcos kafka --name=kafka topic unavailable_partitions
{
"message": ""
}
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/unavailable_partitions"
{
"message": ""
}