Apache Kafka is available from the DC/OS Catalog and can be installed using either the DC/OS UI or the DC/OS CLI.
Prerequisites
-
Depending on your security mode in Enterprise DC/OS, you may need to provision a service account before installing Kafka. You will need a
superuser
permission to create the service account. The following is a list of security modes:strict
security mode requires a service account.permissive
security mode a service account is optional.disabled
security mode does not require a service account.
-
Your cluster must have at least three private nodes.
Types of Installation Methods
There are four types of installation methods:
- Default installation is the basic installation method. This method is used to install Kafka on a DC/OS cluster using CLI commands.
- Minimal installation is used to install Kafka on a local C/OS cluster using dcos-vagrant and is specific for development purposes.
- Custom installation is used to install Kafka on DC/OS cluster using a customized JSON file.
- Multiple Kafka cluster installation is used to install multiple Kafka clusters using custom configurations.
Default Installation
To start a basic test cluster with three brokers, run the following command on the DC/OS CLI. Enterprise DC/OS users must follow additional instructions. More information about installing Kafka on Enterprise DC/OS.
dcos package install beta-kafka
This command creates a new Kafka cluster with the default name kafka
. Two clusters cannot share the same name, so installing additional clusters beyond the default cluster requires customizing the name
at install time for each additional instance.
All dcos kafka
CLI commands have a --name
argument allowing the user to specify which Kafka instance to query. If you do not specify a service name, the CLI assumes the default value, kafka
. The default value for --name
can be customized via the DC/OS CLI configuration:
dcos kafka --name=<kafka-dev> <cmd>
Enter the following command from the DC/OS CLI:
dcos package install beta-kafka --cli
Minimal Installation
For development purposes, you can use dcos-vagrant to install Kafka on a local DC/OS cluster.
To start a minimal cluster with a single broker, create a JSON options file named sample-kafka-minimal.json
:
{
"brokers": {
"count": 1,
"mem": 512,
"disk": 1000
}
}
The command below creates a cluster using sample-kafka-minimal.json
:
dcos package install beta-kafka --options=sample-kafka-minimal.json
Custom Installation
Customize the defaults by creating a JSON file. Then, pass it to dcos package install
using the --options
parameter.
A sample JSON options file is named as sample-kafka-custom.json:
{
"service": {
"name": "sample-kafka-custom",
"placement_strategy": "NODE"
},
"brokers": {
"count": 10,
"kill_grace_period": 30
},
"kafka": {
"delete_topic_enable": true,
"log_retention_hours": 128
}
}
The following command creates a cluster using sample-kafka.json
:
dcos package install --options=sample-kafka-custom.json beta-kafka
See Configuration Options for a list of fields that can be customized via an JSON options file when the Kafka cluster is created.
Multiple Kafka cluster installation
Installing multiple Kafka clusters is identical to installing Kafka clusters with custom configurations as described above. The only requirement on the operator is that a unique name
is specified for each installation.
See the following example:
cat kafka1.json
{
"service": {
"name": "kafka1"
}
}
dcos package install beta-kafka --options=kafka1.json
Changing Configuration at Runtime
You can customize your cluster in-place when it is up and running.
The Kafka scheduler runs as a Marathon process and can be reconfigured by changing values from the DC/OS web interface.
Use the following steps to change configurations at runtime:
- Go to the
Services
tab of the DC/OS web interface. - Select the name of the Kafka service to be updated.
- Within the Kafka instance details view, select the menu in the upper right, then choose Edit.
- In the dialog that appears, select the Environment tab and update any fields to their desired values. For example, to increase the number of Brokers, edit the value for
BROKER_COUNT
. Do not edit the value forFRAMEWORK_NAME
orBROKER_DISK
. - Choose a
DEPLOY_STRATEGY
: serial, serial-canary, parallel-canary, or parallel. See the SDK Developer guide for more information on deployment plan strategies. - Select REVIEW & RUN to apply any changes and cleanly reload the Kafka scheduler. The Kafka cluster itself will persist across the change.
Configuration Update REST API
Make the REST request below to view the current deployment plan. See the REST API Authentication part of the REST API Reference topic for information on how this request must be authenticated.
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan"
{
"phases": [
{
"id": "b6180a4e-b25f-4307-8855-0b37d671fd46",
"name": "Deployment",
"steps": [
{
"id": "258f19a4-d6bc-4ff1-8685-f314924884a1",
"status": "COMPLETE",
"name": "kafka-0:[broker]",
"message": "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-0:[broker] [258f19a4-d6bc-4ff1-8685-f314924884a1]' has status: 'COMPLETE'."
},
{
"id": "e59fb2a9-22e2-4900-89e3-bda24041639f",
"status": "COMPLETE",
"name": "kafka-1:[broker]",
"message": "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-1:[broker] [e59fb2a9-22e2-4900-89e3-bda24041639f]' has status: 'COMPLETE'."
},
{
"id": "0b5a5048-fd3a-4b2c-a9b5-746045176d29",
"status": "COMPLETE",
"name": "kafka-2:[broker]",
"message": "com.mesosphere.sdk.scheduler.plan.DeploymentStep: 'kafka-2:[broker] [0b5a5048-fd3a-4b2c-a9b5-746045176d29]' has status: 'COMPLETE'."
}
],
"status": "COMPLETE"
}
],
"errors": [],
"status": "COMPLETE"
}
Enter the continue
command to execute the first step:
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan?cmd=continue"
PUT <dcos_url>/service/kafka/v1/continue HTTP/1.1
{
"Result": "Received cmd: continue"
}
After you execute the continue operation, you will be see the following code block:
curl -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan"
GET <dcos_url>/service/kafka/v1/plan HTTP/1.1
{
"phases": [
{
"id": "9f8927de-d0df-4f72-bd0d-55e3f2c3ab21",
"name": "Reconciliation",
"steps": [
{
"id": "2d137273-249b-455e-a65c-3c83228890b3",
"status": "COMPLETE",
"name": "Reconciliation",
"message": "Reconciliation complete"
}
],
"status": "COMPLETE"
},
{
"id": "a7742963-f7e1-4640-8bd0-2fb28dc04045",
"name": "Update to: 6092e4ec-8ffb-49eb-807b-877a85ef8859",
"steps": [
{
"id": "b4453fb0-b4cc-4996-a05c-762673f75e6d",
"status": "IN_PROGRESS",
"name": "broker-0",
"message": "Broker-0 is IN_PROGRESS"
},
{
"id": "b8a8de9f-8758-4d0f-b785-0a38751a2c94",
"status": "WAITING",
"name": "broker-1",
"message": "Broker-1 is WAITING"
},
{
"id": "49e85522-1bcf-4edb-9456-712e8a537dbc",
"status": "PENDING",
"name": "broker-2",
"message": "Broker-2 is PENDING"
}
],
"status": "IN_PROGRESS"
}
],
"errors": [],
"status": "IN_PROGRESS"
}
If you enter continue
a second time, the rest of the plan will be executed without further interruption. If you want to interrupt a configuration update that is in progress, enter the interrupt
command:
curl -X PUT -H "Authorization: token=$auth_token" "<dcos_url>/service/kafka/v1/plan?cmd=interrupt"
PUT <dcos_url>/service/kafka/v1/interrupt HTTP/1.1
{
"Result": "Received cmd: interrupt"
}
Configuration Options
The following describes the most commonly used features of the Kafka service and how to configure them via the DC/OS CLI and from the DC/OS web interface. View the default config.json
in DC/OS Universe to see all possible configuration options.
Service Name
A Service Name is the name of the Kafka instance in DC/OS. This is an option that cannot be changed once the Kafka cluster is started: it can only be configured via the DC/OS CLI --options
flag when the Kafka instance is created.
- In DC/OS CLI options.json:
name
: string (default:kafka
) - DC/OS web interface: The service name cannot be changed after the cluster has started.
Kill Grace Period
The kill grace period is the number of seconds each broker has to cleanly shut
down in response to SIGTERM. If a broker exceeds this time, it will be killed.
Use the brokers.kill_grace_period
configuration option to set a kill grace period.
The graceful shutdown feature is especially important for large-scale deployments. Use the graceful shutdown configuraiton option to provide the broker sufficient time during shutdown. This ensure that all in-memory data is flushed to disk and all state is replicated. When a broker has sufficient time to shut down, the subsequent restart will be nearly as fast as the first startup. This is a large contributor to the Kafka service’s high availability.
You can observe the graceful shutdown feature via the following log entries:
- The task launch log line contains
kill_policy { grace_period { nanoseconds: 30000000000 } }
. - The task graceful shutdown log line contains SIGTERM as well as the grace time granted.
- The underlying Kafka logging of shutdown operations includes a stream of subsystem shutdowns prior to the overarching system
shutdown indicated by the entry
[Kafka Server 1], shut down completed (kafka.server.KafkaServer)
. - The presence (or not) of a SIGKILL log line indicating that the underlying Kafka broker did not shutdown cleanly within the allotted grace period.
- The task status update marked by
TASK_KILLED
, indicating the end of the shutdown activity.
Broker Count
Configure the number of brokers running in a given Kafka cluster. The default count at installation is three brokers. This number may be increased, but not decreased, after installation.
- In DC/OS CLI options.json:
broker-count
: integer (default:3
) - DC/OS web interface:
BROKER_COUNT
:integer
Broker Port
Configure the port number that the brokers listen on. If the brokers.port
is set to a particular value, this will be the port used by all brokers. Note that this requires that placement-strategy
be set to NODE
to take effect, since having every broker listening on the same port requires that they be placed on different hosts. By default the port is set to 0
indicating that each Broker will have a random port from the port range offered by the Mesos Agent between 1025
and 32000
- In DC/OS CLI options.json:
brokers.port
: integer (default:0
) - DC/OS web interface:
Port
:integer
Apache Kafka brokers also get assigned a VIP hostname for load-balancing purposes. This VIP is broker.<SERVICE_NAME>.l4lb.thisdcos.directory:9092
, where <SERVICE_NAME>
is the service name you have provided when creating the Kafka cluster. By default the <SERVICE_NAME>
is kafka
, so, by default, the VIP is broker.kafka.l4lb.thisdcos.directory:9092
.
When security.transport_encryption.enabled
is true
the VIP hostname uses the port 9093
Configure Broker Placement Strategy
ANY
allows brokers to be placed on any node with sufficient resources, while NODE
ensures that all brokers within a given Kafka cluster are never colocated on the same node. This is an option that cannot be changed once the Kafka cluster is started: it can only be configured via the DC/OS CLI --options
flag when the Kafka instance is created.
- In DC/OS CLI options.json:
placement-strategy
:ANY
orNODE
(default:ANY
) - DC/OS web interface:
PLACEMENT_STRATEGY
:ANY
orNODE
Configure Kafka Broker Properties
Kafka Brokers are configured through settings in a server.properties file deployed with each Broker. The settings here can be specified at installation time or during a post-deployment configuration update. They are set in the DC/OS Universe’s config.json as options such as:
"log_retention_hours": {
"title": "log.retention.hours",
"description": "Override log.retention.hours: The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property",
"type": "integer",
"default": 168
},
The defaults can be overridden at install time by specifying an options.json file with the following format:
{
"kafka": {
"log_retention_hours": 100
}
}
These same values are also represented as environment variables for the scheduler in the form KAFKA_OVERRIDE_LOG_RETENTION_HOURS
and may be modified through the DC/OS web interface and deployed during a rolling upgrade as described in changing configuration at runtime.
Disk Type
The type of disks that can be used for storing broker data are: ROOT
(default) and MOUNT
. The type of disk may only be specified at install time.
ROOT
: Broker data is stored on the same volume as the agent work directory. Broker tasks will use the configured amount of disk space.MOUNT
: Broker data will be stored on a dedicated volume attached to the agent. Dedicated MOUNT volumes have performance advantages and a disk error on these MOUNT volumes will be correctly reported to Kafka.
Configure Kafka service to use dedicated disk volumes, as follows:
- DC/OS cli options.json:
{
"brokers": {
"disk_type": "MOUNT"
}
}
- DC/OS web interface: Set the environment variable
DISK_TYPE
:MOUNT
When configured to MOUNT
disk type, the scheduler selects a disk on an agent whose capacity is equal to or greater than the configured disk
value.
JVM Heap Size
Kafka service allows configuration of JVM Heap Size for the broker JVM process. Use the following configuration options:
{
"brokers": {
"heap": {
"size": 2000
}
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEAP_MB
:2000
Alternate ZooKeeper
By default the Kafka framework uses the ZooKeeper ensemble made available on the Mesos masters of a DC/OS cluster. You can configure an alternate ZooKeeper at install time.
Use the following configuration options:
- DC/OS CLI options.json:
{
"kafka": {
"kafka_zookeeper_uri": "zookeeper.marathon.autoip.dcos.thisdcos.directory:2181"
}
}
This configuration option cannot be changed after installation.
Recovery and Health Checks
You can enable automated replacement of brokers and configure the circumstances under which they are replaced.
Enable Broker Replacement
To enable automated replacement using the following options:
- DC/OS CLI options.json:
{
"enable_replacement":{
"description":"Enable automated replacement of Brokers. WARNING: May cause data loss. See documentation.",
"type":"boolean",
"default":false
}
}
- DC/OS web interface: Set the environment variable
ENABLE_REPLACEMENT
:true
to enable replacement.
The following configuration options control the circumstances under which a broker is replaced.
Minumum Grace Period
Configure the minimum amount of time before a broker should be replaced.
- DC/OS CLI options.json:
{
"recover_in_place_grace_period_secs":{
"description":"The minimum amount of time (in minutes) which must pass before a Broker may be destructively replaced.",
"type":"number",
"default":1200
}
}
- DC/OS web interface: Set the environment variable
RECOVERY_GRACE_PERIOD_SEC
:1200
Minumum Delay Between Replacements
Configure the minimum amount of time between broker replacements.
{
"min_delay_between_recovers_secs":{
"description":"The minimum amount of time (in seconds) which must pass between destructive replacements of Brokers.",
"type":"number",
"default":600
}
}
- DC/OS web interface: Set the environment variable
REPLACE_DELAY_SEC
:600
The following configurations control the health checks that determine when a broker has failed:
Enable Health Check
Enable health checks on brokers.
{
"enable_health_check":{
"description":"Enable automated detection of Broker failures which did not result in a Broker process exit.",
"type":"boolean",
"default":true
}
}
- DC/OS web interface: Set the environment variable
ENABLE_BROKER_HEALTH_CHECK
:true
Health Check Delay
Set the amount of time before the health check begins.
{
"health_check_delay_sec":{
"description":"The period of time (in seconds) waited before the health-check begins execution.",
"type":"number",
"default":15
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEALTH_CHECK_DELAY_SEC
:15
Health Check Interval
Set the interval between health checks.
{
"health_check_interval_sec":{
"description":"The period of time (in seconds) between health-check executions.",
"type":"number",
"default":10
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEALTH_CHECK_INTERVAL_SEC
:10
Health Check Timeout
Set the time a health check can take to complete before it is considered a failed check.
{
"health_check_timeout_sec":{
"description":"The duration (in seconds) allowed for a health-check to complete before it is considered a failure.",
"type":"number",
"default":20
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEALTH_CHECK_TIMEOUT_SEC
:20
Health Check Grace Period
Set the amount of time after the delay before health check failures count toward the maximum number of consecutive failures.
{
"health_check_grace_period_sec":{
"description":"The period of time after the delay (in seconds) before health-check failures count towards the maximum consecutive failures.",
"type":"number",
"default":10
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEALTH_CHECK_GRACE_SEC
:10
Maximum Consecutive Health Check Failures
{
"health_check_max_consecutive_failures":{
"description":"The number of consecutive failures which cause a Broker process to exit.",
"type":"number",
"default":3
}
}
- DC/OS web interface: Set the environment variable
BROKER_HEALTH_CHECK_MAX_FAILURES
:3