Advanced
This section describes some advanced features of the DC/OS Apache Kafka service.
Components
The following components work together to deploy and maintain the DC/OS Apache Kafka service.
-
Scheduler
The Scheduler is the “management layer” of the service. It launches the service nodes and keeps them running. It also exposes endpoints to allow end users to control the service and diagnose problems. The Scheduler is kept online by the cluster’s “init system”, Marathon. The Scheduler itself is effectively a Java application that is configured via passed configuration options.
-
Mesos
Mesos is the foundation of the DC/OS cluster. Mesos allocates resources and manages everything launched within the cluster. A typical Mesos cluster has one or three Masters that manage resources for the entire cluster. On DC/OS, the machines running the Mesos Masters will typically run other cluster services as well, such as Marathon and Cosmos, as local system processes. The Agent machines are separate from the Master machines; the Agent machines are where in-cluster processes are run. For more information on Mesos architecture, see the Apache Mesos documentation. For more information on DC/OS architecture, see the DC/OS architecture documentation.
-
ZooKeeper
ZooKeeper is a common foundation for DC/OS system components, like Marathon and Mesos. It provides distributed key-value storage for configuration, synchronization, name registration, and cluster state storage. DC/OS comes with ZooKeeper installed by default, typically with one instance per DC/OS master. DC/OS Apache Kafka schedulers use the default ZooKeeper instance to store persistent state across restarts (under
znodes
nameddcos-service-<svcname>
). This allows Schedulers to be killed at any time and continue where they left off. -
Marathon
Marathon is the “init system” of a DC/OS cluster. Marathon launches tasks in the cluster and keeps them running. From the perspective of Mesos, Marathon is itself another Scheduler running its own tasks. Marathon is less specialized than the DC/OS Apache Kafka scheduler and mainly focuses on tasks that do not require managing a local persistent state. The DC/OS Apache Kafka service relies on Marathon to run the Scheduler and to provide it with a configuration via environment variables. The Scheduler, however, maintains the service tasks without any direct involvement by Marathon.
-
Packaging
Apache Kafka is packaged for deployment on DC/OS. DC/OS packages follow the Universe schema, which defines how packages expose customization options at initial installation. When a package is installed on the cluster, the packaging service (named ‘Cosmos’) creates a Marathon app that contains a rendered version of the
marathon.json.mustache
template provided by the package. For DC/OS Apache Kafka, this Marathon app is the scheduler for the service.
For further discussion of DC/OS components, see the architecture documentation.
Deployment
Internally, the DC/OS Apache Kafka service treats “Deployment” as moving from one state to another state. By this definition, “Deployment” applies to many scenarios:
- When the Kafka package is first installed, deployment moves from a null configuration to a deployed configuration.
- When the deployed configuration is changed by updating the service, deployment moves from an initial running configuration to a new proposed configuration.
In this section, we will describe how these scenarios are handled by the Scheduler.
Initial Install
This is the process for deploying a new instance of the service:
Steps handled by the DC/OS cluster
-
The user runs
dcos package install kafka
in the DC/OS CLI or clicksInstall
for a given package on the DC/OS Dashboard. -
A request is sent to the Cosmos packaging service to deploy the requested package along with a set of configuration options.
-
Cosmos creates a Marathon app definition by rendering the kafka package’s
marathon.json.mustache
with the configuration options provided in the request, which represents the service’s scheduler. Cosmos POSTs to Marathon to create the app. -
Marathon launches the kafka package’s scheduler somewhere in the cluster using the rendered app definition provided by Cosmos.
-
The kafka packages’s scheduler is launched. From this point onwards, the Scheduler handles deployment.
Steps handled by the Scheduler
Apache Kafka’s main()
function is run like any other Java application. The Scheduler starts with the following state:
- A
svc.yml
template that represents the service configuration. - Environment variables provided by Marathon, to be applied onto the
svc.yml
template.
-
The
svc.yml
template is rendered using the environment variables provided by Marathon. -
The rendered
svc.yml
“Service Spec” contains the host/port for the ZooKeeper instance, which the Scheduler uses for persistent configuration/state storage. The default ismaster.mesos:2181
, but may be manually configured to use a different ZooKeeper instance. The Scheduler always stores its information under a znode nameddcos-service-<svcname>
. -
The service scheduler connects to the DC/OS ZooKeeper instance at
master.mesos:2181
and checks the znodedcos-service-<svcname>
to see if it has previously stored a Mesos Framework ID for itself.
-
If the Framework ID is present, the scheduler will attempt to reconnect to Mesos using that ID. This may result in a “Framework has been removed” error if Mesos doesn’t recognize that Framework ID, indicating an incomplete uninstall.
-
If the Framework ID is not present, the scheduler will attempt to register with Mesos as a Framework. Assuming this is successful, the resulting Framework ID is then immediately stored.
-
Now that the Scheduler has registered as a Mesos Framework, it is able to start interacting with Mesos and receiving offers. When this begins, the scheduler will begin running the Offer Cycle and deploying the service. See that section for more information.
-
The Scheduler retrieves its deployed task state from ZooKeeper and finds that there are tasks that should be launched. This is the first launch, so all tasks need to be launched.
-
The Scheduler deploys those missing tasks through the Mesos offer cycle using a Deployment Plan to determine the ordering of that deployment.
-
Once the Scheduler has launched the missing tasks, its current configuration should match the desired configuration defined by the “Service Spec” extracted from
svc.yml
.- When the current configuration matches the desired configuration, the Scheduler will tell Mesos to suspend sending new offers, as there’s nothing to be done.
- The Scheduler idles until it receives an RPC from Mesos notifying it of a task status change, it receives an RPC from an end user against one of its HTTP APIs, or until it is killed by Marathon as the result of a configuration change.
Reconfiguration
This is the process when a configuration update is issued to a running instance of the service.
Steps handled by the DC/OS cluster
- The user edits the Scheduler’s configuration either using the Scheduler CLI’s
update
command or via the DC/OS web interface. - The DC/OS package manager instructs Marathon to kill the current Scheduler and launch a new Scheduler with the updated configuration.
Steps handled by the Scheduler
As with initial install above, at this point the Scheduler is re-launched with the same two sources of information it had before:
svc.yml
template- New environment variables
In addition, the Scheduler now has an additional piece:
- Pre-existing state in ZooKeeper
Scheduler reconfiguration is slightly different from initial deployment because the Scheduler is now comparing its current state to a non-empty prior state and determining what needs to be changed.
- After the Scheduler has rendered its
svc.yml
against the new environment variables, it has two Service Specs, reflecting two different configurations.- The Service Spec that was just rendered, reflecting the configuration change.
- The prior Service Spec (or “Target Configuration”) that was previously stored in ZooKeeper.
- The Scheduler automatically compares the changes between the old and new Service Specs.
- Change validation: Certain changes, such as editing volumes and scale-down, are not currently supported.
- If an invalid change is detected, the Scheduler will send an error message and refuse to proceed until the user has reverted the change by relaunching the Scheduler app in Marathon with the prior configuration.
- If the changes are valid, the new configuration is stored in ZooKeeper as the new Target Configuration and the change deployment proceeds as described below.
- Change deployment: The Scheduler produces a
diff
between the current state and some future state, including all of the Mesos calls (reserve, unreserve, launch, destroy, and so on) needed to get there. For example, if the number of tasks has been increased, then the Scheduler will launch the correct number of new tasks. If a task configuration setting has been changed, the Scheduler will deploy that change to the relevant affected tasks by relaunching them. Tasks that are not affected by the configuration change will be left as-is.
- Change validation: Certain changes, such as editing volumes and scale-down, are not currently supported.
Uninstall
This is the process for uninstalling the DC/OS Apache Kafka service.
Steps handled by the cluster
- The user uses the DC/OS CLI’s
dcos package uninstall
command to uninstall the service. - The DC/OS package manager instructs Marathon to kill the current Scheduler and to launch a new Scheduler with the environment variable
SDK_UNINSTALL
set to “true”.
Steps handled by the Scheduler
When started in uninstall mode, the Scheduler performs the following actions:
- All running service tasks are killed.
- Any Mesos resource reservations are unreserved.
- The pre-existing state in ZooKeeper is deleted. The znode
dcos-service-<svc-name>
will be left behind. This is due to the structure of the ACLs on the root (/
) of the DC/OS ZooKeeper.
Offer Cycle
The Offer Cycle is a core Mesos concept and often a source of confusion when running services on Mesos.
Mesos will periodically notify subscribed Schedulers of resources in the cluster. Schedulers are expected to either accept the offered resources or decline them. In this structure, Schedulers never have a complete picture of the cluster, they only know what is being explicitly offered to them at any given time. This allows Mesos the option of only advertising certain resources to specific Schedulers, without requiring any changes on the Scheduler’s end, but it also means that the Scheduler cannot deterministically know whether it has seen everything that is available in the cluster.
The service scheduler performs the following operations as offers are received from Mesos:
- Task Reconciliation: Mesos is the source of truth for what is running on the cluster. Task Reconciliation allows Mesos to convey the status of all tasks being managed by the service. The Scheduler will request a Task Reconciliation during initial startup, and Mesos will then send the current status of that Scheduler’s tasks. This allows the Scheduler to catch up with any potential status changes to its tasks that occurred after the Scheduler was last running. A common pattern in Mesos is to reserve most information about tasks, so this only contains status information, not general task information. The Scheduler keeps its own copy of what it knows about tasks in ZooKeeper. During an initial deployment this process is very fast as no tasks have been launched yet.
- Offer Acceptance: Once the Scheduler has finished Task Reconciliation, it will start evaluating the resource offers it receives to determine if any match the requirements of the next task(s) to be launched. At this point, users on small clusters may find that the Scheduler isn’t launching tasks. This is generally because the Scheduler isn’t able to find offered machines with enough room to fit the tasks. To fix this, add more/bigger machines to the cluster, or reduce the requirements of the service.
- Resource Cleanup: The Offers provided by Mesos include reservation information if those resources were previously reserved by the Scheduler. The Scheduler will automatically request that any unrecognized but reserved resources be automatically unreserved. This can come up in a few situations; for example, if an agent machine was unavailable for several days and then came back, its resources may still be considered reserved by Mesos as reserved by the service, while the Scheduler does not. At this point, the Scheduler will automatically clean up those resources.
The service scheduler declines all offers that do not match what it is currently trying to launch “forever”. In the case that the workset of the scheduler changes, it will revive offers from Mesos. Mesos will also send any “novel” offers to the scheduler, such as when a different service is removed and new resources become available on an agent.
Permanent and temporary recovery
There are two types of recovery, permanent and temporary. The difference is mainly whether the task being recovered should stay on the same machine, and the side effects that result from that.
- Temporary recovery:
- Temporary recovery is triggered when there is an error in the task or the host machine.
- Recovery involves relaunching the task on the same machine as before.
- Recovery occurs automatically.
- Any data in the task’s persistent volumes survives the outage.
- May be manually triggered by a
pod restart
command.
- Permanent recovery:
- Permanent recovery can be requested when the host machine fails permanently or when the host machine is scheduled for downtime.
- Recovery involves discarding any persistent volumes that the pod had on the host machine.
- Recovery only occurs in response to a manual
pod replace
command (or you may build your own tooling to invoke thereplace
command).
Triggering a permanent recovery is a destructive operation, as it discards any prior persistent volumes for the pod being recovered. This is desirable when the operator knows that the previous machine is not coming back. For safety’s sake, permanent recovery is currently never automatically triggered by the service itself.
Persistent Volumes
Volumes are advertised as resources by Mesos, and Mesos offers multiple types of persistent volumes. The DC/OS Apache Kafka service supports two of these types: ROOT volumes and MOUNT volumes.
-
ROOT volumes:
- Use a shared filesystem tree.
- Share I/O with anything else on that filesystem.
- Are supported by default in new deployments and do not require additional cluster-level configuration.
- Are allocated exactly the amount of disk space that was requested.
-
MOUNT volumes:
- Use a dedicated partition.
- Have dedicated I/O for the partition.
- Require additional configuration when setting up the DC/OS cluster.
- Are allocated the entire partition, so allocated space can far exceed what was originally requested. MOUNT volumes cannot be further subdivided between services.
The fact that MOUNT volumes cannot be subdivided between services means that if multiple services are deployed with MOUNT volumes, they can quickly be unable to densely colocate within the cluster unless many MOUNT volumes are created on each agent. Let’s look at the following deployment scenario across three DC/OS agent machines, each with two enabled MOUNT volumes labeled A and B:
Agent 1: A B
Agent 2: A B
Agent 3: A B
Now we install a service X with two nodes that each use one mount volume. The service consumes volume A on agents 1 and 3:
Agent 1: X B
Agent 2: A B
Agent 3: X B
Now a service Y is installed with two nodes that each use two mount volumes. The service consumes volume A and B on agent 2, but then is stuck without being able to deploy anything else:
Agent 1: X B
Agent 2: Y Y
Agent 3: X B
Configuring ROOT
vs MOUNT
volumes may depend on the service. Some services will support customizing this setting when it is relevant, while others may assume one or the other.
Enterprise
SecretsEnterprise DC/OS provides a secrets store to enable access to sensitive data such as database passwords, private keys, and API tokens. DC/OS manages secure transportation of secret data, access control and authorization, and secure storage of secret content.
Secrets are available only in Enterprise DC/OS 1.10 and later versions. Learn more about the secrets store.
Authorization for Secrets
The path of a secret defines which service IDs can have access to it. You can think of secret paths as namespaces. Only services that are under the same namespace can read the content of the secret.
Secret | Service ID | Can service access secret? |
---|---|---|
Secret_Path1 |
/user |
Yes |
Secret_Path1 |
/dev1/user |
Yes |
secret-svc/Secret_Path1 |
/user |
No |
secret-svc/Secret_Path1 |
/user/dev1 |
No |
secret-svc/Secret_Path1 |
/secret-svc |
Yes |
secret-svc/Secret_Path1 |
/secret-svc/dev1 |
Yes |
secret-svc/Secret_Path1 |
/secret-svc/instance2/dev2 |
Yes |
secret-svc/Secret_Path1 |
/secret-svc/a/b/c/dev3 |
Yes |
secret-svc/instance1/Secret_Path2 |
/secret-svc/dev1 |
No |
secret-svc/instance1/Secret_Path2 |
/secret-svc/instance2/dev3 |
No |
secret-svc/instance1/Secret_Path2 |
/secret-svc/instance1 |
Yes |
secret-svc/instance1/Secret_Path2 |
/secret-svc/instance1/dev3 |
Yes |
secret-svc/instance1/Secret_Path2 |
/secret-svc/instance1/someDir/dev3 |
Yes |
Note: Absolute paths (paths with a leading slash) to secrets are not supported. The file path for a secret must be relative to the sandbox. |
Binary Secrets
You can store binary files, like a Kerberos keytab, in the DC/OS secrets store. In DC/OS 1.11 and later, you can create secrets from binary files directly. In DC/OS 1.10 or earlier versions, files must be base64-encoded, as specified in RFC 4648, prior to being stored as secrets.
DC/OS 1.11 and later
To create a secret called mysecret
with the binary contents of kerb5.keytab
run:
dcos security secrets create --file kerb5.keytab mysecret
DC/OS 1.10 or earlier
To create a secret called mysecret
with the binary contents of kerb5.keytab
, first encode it using the base64
command line utility. The following example uses BSD base64
(default on Mac OS).
base64 --input krb5.keytab > kerb5.keytab.base64-encoded
Alternatively, GNU base64
(the default on Linux) inserts line-feeds in the encoded data by default. Disable line-wrapping with the -w 0
argument.
base64 -w 0 krb5.keytab > kerb5.keytab.base64-encoded
Now that the file is encoded it can be stored as a secret.
dcos security secrets create --text-file kerb5.keytab.base64-encoded some/path/__dcos_base64__mysecret
When the some/path/__dcos_base64__mysecret
secret is referenced in your service definition, its base64-decoded contents will be made available as a temporary file in your service task containers.
Service Scheduler Metrics
The service scheduler records a number of metrics that can be used to diagnose issues with the scheduler and monitor the performance of the scheduler. The metrics can be consumed via DC/OS metrics, or pulled directly from the service scheduler.
JSON
A JSON representation of the metrics is available at the /v1/metrics
endpoint of the service scheduler.
{
"version": "3.1.3",
"gauges": {},
"counters": {
"declines.long": {
"count": 15
},
"offers.processed": {
"count": 18
},
"offers.received": {
"count": 18
},
"operation.create": {
"count": 5
},
"operation.launch_group": {
"count": 3
},
"operation.reserve": {
"count": 20
},
"revives": {
"count": 3
},
"task_status.task_running": {
"count": 6
}
},
"histograms": {},
"meters": {},
"timers": {
"offers.process": {
"count": 10,
"max": 0.684745927,
"mean": 0.15145255818999337,
"min": 5.367950000000001E-4,
"p50": 0.0035879090000000002,
"p75": 0.40317217800000005,
"p95": 0.684745927,
"p98": 0.684745927,
"p99": 0.684745927,
"p999": 0.684745927,
"stddev": 0.24017017290826104,
"m15_rate": 0.5944843686231079,
"m1_rate": 0.5250565015924039,
"m5_rate": 0.583689104996544,
"mean_rate": 0.3809369986002824,
"duration_units": "seconds",
"rate_units": "calls/second"
}
}
}
Prometheus
A Prometheus representation of the metrics is available at the /v1/metrics/prometheus
endpoint of the service scheduler.
# HELP declines_long Generated from Dropwizard metric import (metric=declines.long, type=com.codahale.metrics.Counter)
# TYPE declines_long gauge
declines_long 20.0
# HELP offers_processed Generated from Dropwizard metric import (metric=offers.processed, type=com.codahale.metrics.Counter)
# TYPE offers_processed gauge
offers_processed 24.0
# HELP offers_received Generated from Dropwizard metric import (metric=offers.received, type=com.codahale.metrics.Counter)
# TYPE offers_received gauge
offers_received 24.0
# HELP operation_create Generated from Dropwizard metric import (metric=operation.create, type=com.codahale.metrics.Counter)
# TYPE operation_create gauge
operation_create 5.0
# HELP operation_launch_group Generated from Dropwizard metric import (metric=operation.launch_group, type=com.codahale.metrics.Counter)
# TYPE operation_launch_group gauge
operation_launch_group 4.0
# HELP operation_reserve Generated from Dropwizard metric import (metric=operation.reserve, type=com.codahale.metrics.Counter)
# TYPE operation_reserve gauge
operation_reserve 20.0
# HELP revives Generated from Dropwizard metric import (metric=revives, type=com.codahale.metrics.Counter)
# TYPE revives gauge
revives 4.0
# HELP task_status_task_finished Generated from Dropwizard metric import (metric=task_status.task_finished, type=com.codahale.metrics.Counter)
# TYPE task_status_task_finished gauge
task_status_task_finished 1.0
# HELP task_status_task_running Generated from Dropwizard metric import (metric=task_status.task_running, type=com.codahale.metrics.Counter)
# TYPE task_status_task_running gauge
task_status_task_running 8.0
# HELP offers_process Generated from Dropwizard metric import (metric=offers.process, type=com.codahale.metrics.Timer)
# TYPE offers_process summary
offers_process{quantile="0.5",} 2.0609500000000002E-4
offers_process{quantile="0.75",} 2.2853200000000001E-4
offers_process{quantile="0.95",} 0.005792643
offers_process{quantile="0.98",} 0.005792643
offers_process{quantile="0.99",} 0.111950848
offers_process{quantile="0.999",} 0.396119612
offers_process_count 244.0
Enterprise
Secure JMXApache Kafka supports Secure JMX allowing you to remotely manage and monitor the Kafka JRE.
Configuration Options
Option | Description |
---|---|
jmx.enabled | Enables the secure JMX |
jmx.port | JMX port |
jmx.rmi_port | JMX RMI port |
jmx.access_file | The path to the secret in the Secret Store that has the contents of the access file. |
jmx.password_file | The path to the secret in the Secret Store that has the contents of the password file. |
jmx.key_store | The path to the secret in the Secret Store that has the contents of the key store. |
jmx.key_store_password_file | The path to the secret in the Secret Store that has the contents of the key store password file. |
jmx.add_trust_store | Enables the user provided trust store. |
jmx.trust_store | The path to the secret in the Secret Store that has the contents of the trust store. |
jmx.trust_store_password_file | The path to the secret in the Secret Store that has the contents of the trust store password file. |
Read more about using JMX options here.
Configuring JMX with self-signed certificate
-
Generate a self-signed key store and trust store.
keytool -genkey -alias server-cert -keyalg rsa -dname "CN=kafka.example.com,O=Example Company,C=US" -keystore keystore.ks -storetype JKS -storepass changeit -keypass changeit
keytool -genkey -alias server-cert -keyalg rsa -dname "CN=kafka.example.com,O=Example Company,C=US" -keystore truststore.ks -storetype JKS -storepass changeit -keypass changeit
-
Generate files containing the trust store and key store passwords.
cat <<EOF >> trust_store_pass changeit EOF
cat <<EOF >> key_store_pass changeit EOF
-
Create a JMX access file.
cat <<EOF >> access_file admin readwrite user readonly EOF
-
Create a JMX password file.
cat <<EOF >> password_file admin adminpassword user userpassword EOF
-
Create necessary secrets in DC/OS for JMX.
dcos package install dcos-enterprise-cli --yes dcos security secrets create -f keystore.ks kafka/keystore dcos security secrets create -f key_store_pass kafka/keystorepass dcos security secrets create -f truststore.ks kafka/truststore dcos security secrets create -f trust_store_pass kafka/truststorepass dcos security secrets create -f password_file kafka/passwordfile dcos security secrets create -f access_file kafka/access
-
Create a custom service configuration
options.json
with JMX enabled.{ "service": { "jmx": { "enabled": true, "port": 31299, "rmi_port": 31298, "access_file": "kafka/access", "password_file": "kafka/passwordfile", "key_store": "kafka/keystore", "key_store_password_file": "kafka/keystorepass", "add_trust_store": true, "trust_store": "kafka/truststore", "trust_store_password_file": "kafka/truststorepass" } } }
-
Install Apache Kafka with the options file you created.
dcos package install kafka --options="options.json"
Service Health Check
DC/OS Apache Kafka supports service oriented health checks allowing you to monitor your service health in details.
Configuration Options
Option | Description |
---|---|
health_check.enabled | Enables the health checks |
health_check.method | “PORT” or “FUNCTIONAL” |
health_check.interval | The period in seconds to wait after the last health check has completed to start the next check. |
health_check.delay | An amount of time in seconds to wait before starting the health check attempts. |
health_check.timeout | An amount of time in seconds to wait for a health check to succeed. |
health_check.grace-period | An amount of time in seconds after the task is launched during which health check failures are ignored. Once a health check succeeds for the first time, the grace period does not apply anymore. Note that it includes delay seconds, i.e., setting grace_period seconds < delay seconds has no effect. |
health_check.max-consecutive-failures | It is the maximum consecutive number of failures after which task will be killed. |
health_check.health-check-topic-prefix | Prefix for the health check topic name. Used when “FUNCTIONAL” health check method is selected. |
service.security.kerberos.health_check_primary | The Kerberos primary used by Kafka health check if enabled. |
Health Check Methods
Port Check
This method will check if the broker port is open. Only the broker on which the health check is running will be checked as each broker will have its own health check.
{
"service": {
"name": "kafka",
"health_check": {
"enabled": true,
"method": "PORT",
}
}
}
Functional Check
It checks if the broker can send and receive messages from a client. Only the broker on which the health check is running will be checked as each broker will have its own health check. The health checks produces a random message to a user configurable topic and then tries to consume the last produced message. When Kerberos and or Transport Encryption is enabled the health check produces only a single random message to a topic and will attempt to consume that same first message of the topic at each health-check interval.
{
"service": {
"name": "kafka",
"health_check": {
"enabled": true,
"method": "FUNCTIONAL",
"health-check-topic-prefix": "MyHealthCheckTopic"
}
}
}