DC/OS Apache Kafka 服务实施可从群集外部访问的 REST API。下面引用的<dcos_url>
参数指示了部署 Apache Kafka 服务的 DC/OS 群集的基础 URL。
REST API 身份认证
REST API 请求必须经过身份认证。此身份认证仅适用于直接与 Apache Kafka REST API 的交互。访问 Apache Kafka 节点自身不需要令牌。
如果您正在使用 Enterprise DC/OS,请遵循以下说明 创建服务帐户和身份认证令牌。然后,您可以配置服务以在其到期时自动刷新身份认证令牌。要更快速地开始,您还可以在没有服务帐户的情况下获得认证令牌,但您需要手动刷新令牌。
如果您正在使用开源 DC/OS,请遵循以下说明 将您的 HTTP API 令牌传递到 DC/OS 端点。
您拥有身份认证令牌之后,您可以将其存储在环境变量中,并在您的 REST API 调用中引用:
$ export auth_token=uSeR_t0k3n
本文中的curl
示例假定身份认证令牌已存储在名为auth_token
的环境变量中。
如果您正在使用 Enterprise DC/OS,安全模式安装在调用 REST 时也可能需要 --ca-cert
标记。请参阅 在 cURL 请求中获取并传递 DC/OS 证书 了解如何使用 --cacert
标记。如果您的安全模式是 disabled
,不要使用 --ca-cert
标记。
Plan API
Plan API 提供用于监控和控制服务安装和配置更新的端点。
列表计划
您可以列出服务的已配置计划。默认情况下,所有服务至少有一个 deploy
计划和一个 recovery
计划。某些服务可能已定义额外的自定义计划。
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans
$ dcos kafka --name=kafka plan list
查看计划
您可以查看所列计划的当前状态:
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/<plan>
CLI 可用于显示计划的格式树(默认),或从上述 HTTP 端点检索的基础 JSON 数据:
$ dcos kafka --name=kafka plan show <plan>
$ dcos kafka --name=kafka plan show <plan> --json
暂停计划
完成当前节点的操作后,安装将暂停,并等待用户输入后再继续。
$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/plans/deploy/interrupt
$ dcos kafka --name=kafka plan pause deploy
恢复计划
下面的 REST API 请求将恢复下一个挂起节点的操作。
$ curl -X PUT -H "Authorization:token=$auth_token" <dcos_surl>/service/kafka/v1/plans/deploy/continue
$ dcos kafka --name=kafka plan continue deploy
节点 API
pod API 提供用于检索节点信息、重新启动节点和更换节点的端点。
列表节点
可以通过发送 GET 请求到 /v1/pod
来检索可用节点 ID 的列表:
CLI 示例
$ dcos kafka --name=kafka pod list
HTTP 示例
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod
节点信息
您可以通过向 /v1/pod/<node-id>/info
发送 GET 请求来检索节点信息:
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/info
CLI 示例
$ dcos kafka --name=kafka pod info journalnode-0
HTTP 示例
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/journalnode-0/info
替换节点
可以使用 replace endpoint
命令用其他代理节点上运行的实例来替换节点。
CLI 示例
$ dcos kafka --name=kafka pod replace <node-id>
HTTP 示例
$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/replace
如果操作成功,返回200 OK
。
重新启动节点
可以使用 restart endpoint
命令重新启动在同一代理节点上的节点。
CLI 示例
$ dcos kafka --name=kafka pod restart <node-id>
HTTP 示例
$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/restart
如果操作成功,返回 200 OK
。
暂停节点
可以使用 pause endpoint
命令重新启动在空闲命令状态下的节点,用于调试。
CLI 示例
dcos kafka --name=kafka debug pod pause <node-id>
HTTP 示例
$ curl -X POST -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/pod/<node-id>/pause
配置 API
配置 API 提供查看群集的当前和先前配置的端点。
查看目标配置
您可以通过发送 GET 请求到 /v1/configurations/target
来查看当前目标配置。
CLI 示例
$ dcos kafka --name=kafka config target
HTTP 示例
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/target
列表配置
您可以通过发送 GET 请求到 /v1/configurations
来列出所有配置 ID。
CLI 示例
$ dcos kafka --name=kafka config list
[
"319ebe89-42e2-40e2-9169-8568e2421023",
"294235f2-8504-4194-b43d-664443f2132b"
]
HTTP 示例
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations
[
"319ebe89-42e2-40e2-9169-8568e2421023",
"294235f2-8504-4194-b43d-664443f2132b"
]
查看指定的配置
您可以通过向 /v1/configurations/ 发送 GET 请求来查看指定配置<config-id>
.
CLI 示例
$ dcos kafka --name=kafka config show 9a8d4308-ab9d-4121-b460-696ec3368ad6
HTTP 示例
$ curl -H "Authorization:token=$auth_token" <dcos_url>/service/kafka/v1/configurations/9a8d4308-ab9d-4121-b460-696ec3368ad6
主题操作
这些操作会镜像 bin/kafka-topics.sh
所提供的内容。
列表主题
$ dcos kafka --name=kafka topic list
[
"topic1",
"topic0"
]
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics"
[
"topic1",
"topic0"
]
描述主题
$ dcos kafka --name=kafka topic describe topic1
{
"partitions": [
{
"0": {
"controller_epoch": 1,
"isr": [
0,
1,
2
],
"leader": 0,
"leader_epoch": 0,
"version": 1
}
},
{
"1": {
"controller_epoch": 1,
"isr": [
1,
2,
0
],
"leader": 1,
"leader_epoch": 0,
"version": 1
}
},
{
"2": {
"controller_epoch": 1,
"isr": [
2,
0,
1
],
"leader": 2,
"leader_epoch": 0,
"version": 1
}
}
]
}
$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
"partitions": [
{
"0": {
"controller_epoch": 1,
"isr": [
0,
1,
2
],
"leader": 0,
"leader_epoch": 0,
"version": 1
}
},
{
"1": {
"controller_epoch": 1,
"isr": [
1,
2,
0
],
"leader": 1,
"leader_epoch": 0,
"version": 1
}
},
{
"2": {
"controller_epoch": 1,
"isr": [
2,
0,
1
],
"leader": 2,
"leader_epoch": 0,
"version": 1
}
}
]
}
创建主题
$ dcos kafka --name=kafka topic create topic1 --partitions=3 --replication=3
{
"message": "Output: Created topic \"topic1\"\n"
}
$ curl -X POST -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?partitions=3&replication=3"
{
"message": "Output: Created topic \"topic1\"\n"
}
查看主题偏移
可选的 --time
参数可以被设置为“first”、“last” 或时间戳(以毫秒为单位),如 Kafka 文档中所述。
$ dcos kafka --name=kafka topic offsets topic1 --time=last
[
{
"2": "334"
},
{
"1": "333"
},
{
"0": "333"
}
]
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1/offsets?time=-1"
[
{
"2": "334"
},
{
"1": "333"
},
{
"0": "333"
}
]
修改主题分区计数
$ dcos kafka --name=kafka topic partitions topic1 2
{
"message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=partitions&partitions=2"
{
"message": "Output: WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectednAdding partitions succeeded!n"
}
在主题上运行生产者测试
$ dcos kafka --name=kafka topic producer_test topic1 10
{
"message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
$ curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1?operation=producer-test&messages=10"
{
"message": "10 records sent, 70.422535 records/sec (0.07 MB/sec), 24.20 ms avg latency, 133.00 ms max latency, 13 ms 50th, 133 ms 95th, 133 ms 99th, 133 ms 99.9th.n"
}
这相当于运行计算机运行 Kafka 调度器时所用的以下命令:
$ kafka-producer-perf-test.sh \
--topic <topic> \
--num-records <messages> \
--throughput 100000 \
--record-size 1024 \
--producer-props bootstrap.servers=<current broker endpoints>
删除主题
$ dcos kafka --name=kafka topic delete topic1
{
"message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
$ curl -X DELETE -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/topic1"
{
"message": "Topic topic1 is marked for deletion.nNote: This will have no impact if delete.topic.enable is not set to true.n"
}
请注意以上命令输出中的警告。您可以更改指示 delete.topic.enable
配置值,作为配置更改。
列出 Under Replicated 分区
$ dcos kafka --name=kafka topic under_replicated_partitions
{
"message": ""
}
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/under_replicated_partitions"
{
"message": ""
}
列出不可用分区
$ dcos kafka --name=kafka topic unavailable_partitions
{
"message": ""
}
$ curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/topics/unavailable_partitions"
{
"message": ""
}