Advanced

Advanced features of Kafka

Advanced

This section describes some advanced features of the DC/OS Apache Kafka service.

Components

The following components work together to deploy and maintain the DC/OS Apache Kafka service.

  • Scheduler

    The Scheduler is the “management layer” of the service. It launches the service nodes and keeps them running. It also exposes endpoints to allow end users to control the service and diagnose problems. The Scheduler is kept online by the cluster’s “init system”, Marathon. The Scheduler itself is effectively a Java application that is configured via passed configuration options.

  • Mesos

    Mesos is the foundation of the DC/OS cluster. Mesos allocates resources and manages everything launched within the cluster. A typical Mesos cluster has one or three Masters that manage resources for the entire cluster. On DC/OS, the machines running the Mesos Masters will typically run other cluster services as well, such as Marathon and Cosmos, as local system processes. The Agent machines are separate from the Master machines; the Agent machines are where in-cluster processes are run. For more information on Mesos architecture, see the Apache Mesos documentation. For more information on DC/OS architecture, see the DC/OS architecture documentation.

  • ZooKeeper

    ZooKeeper is a common foundation for DC/OS system components, like Marathon and Mesos. It provides distributed key-value storage for configuration, synchronization, name registration, and cluster state storage. DC/OS comes with ZooKeeper installed by default, typically with one instance per DC/OS master. DC/OS Apache Kafka schedulers use the default ZooKeeper instance to store persistent state across restarts (under znodes named dcos-service-<svcname>). This allows Schedulers to be killed at any time and continue where they left off.

  • Marathon

    Marathon is the “init system” of a DC/OS cluster. Marathon launches tasks in the cluster and keeps them running. From the perspective of Mesos, Marathon is itself another Scheduler running its own tasks. Marathon is less specialized than the DC/OS Apache Kafka scheduler and mainly focuses on tasks that do not require managing a local persistent state. The DC/OS Apache Kafka service relies on Marathon to run the Scheduler and to provide it with a configuration via environment variables. The Scheduler, however, maintains the service tasks without any direct involvement by Marathon.

  • Packaging

    Apache Kafka is packaged for deployment on DC/OS. DC/OS packages follow the Universe schema, which defines how packages expose customization options at initial installation. When a package is installed on the cluster, the packaging service (named ‘Cosmos’) creates a Marathon app that contains a rendered version of the marathon.json.mustache template provided by the package. For DC/OS Apache Kafka, this Marathon app is the scheduler for the service.

For further discussion of DC/OS components, see the architecture documentation.

Deployment

Internally, the DC/OS Apache Kafka service treats “Deployment” as moving from one state to another state. By this definition, “Deployment” applies to many scenarios:

  • When the Kafka package is first installed, deployment moves from a null configuration to a deployed configuration.
  • When the deployed configuration is changed by updating the service, deployment moves from an initial running configuration to a new proposed configuration.

In this section, we will describe how these scenarios are handled by the Scheduler.

Initial Install

This is the process for deploying a new instance of the service:

Steps handled by the DC/OS cluster

  1. The user runs dcos package install kafka in the DC/OS CLI or clicks Install for a given package on the DC/OS Dashboard.

  2. A request is sent to the Cosmos packaging service to deploy the requested package along with a set of configuration options.

  3. Cosmos creates a Marathon app definition by rendering the kafka package’s marathon.json.mustache with the configuration options provided in the request, which represents the service’s scheduler. Cosmos POSTs to Marathon to create the app.

  4. Marathon launches the kafka package’s scheduler somewhere in the cluster using the rendered app definition provided by Cosmos.

  5. The kafka packages’s scheduler is launched. From this point onwards, the Scheduler handles deployment.

Steps handled by the Scheduler

Apache Kafka’s main() function is run like any other Java application. The Scheduler starts with the following state:

  • A svc.yml template that represents the service configuration.
  • Environment variables provided by Marathon, to be applied onto the svc.yml template.
  1. The svc.yml template is rendered using the environment variables provided by Marathon.

  2. The rendered svc.yml “Service Spec” contains the host/port for the ZooKeeper instance, which the Scheduler uses for persistent configuration/state storage. The default is master.mesos:2181, but may be manually configured to use a different ZooKeeper instance. The Scheduler always stores its information under a znode named dcos-service-<svcname>.

  3. The service scheduler connects to the DC/OS ZooKeeper instance at master.mesos:2181 and checks the znode dcos-service-<svcname> to see if it has previously stored a Mesos Framework ID for itself.

  • If the Framework ID is present, the scheduler will attempt to reconnect to Mesos using that ID. This may result in a “Framework has been removed” error if Mesos doesn’t recognize that Framework ID, indicating an incomplete uninstall.

  • If the Framework ID is not present, the scheduler will attempt to register with Mesos as a Framework. Assuming this is successful, the resulting Framework ID is then immediately stored.

  1. Now that the Scheduler has registered as a Mesos Framework, it is able to start interacting with Mesos and receiving offers. When this begins, the scheduler will begin running the Offer Cycle and deploying the service. See that section for more information.

  2. The Scheduler retrieves its deployed task state from ZooKeeper and finds that there are tasks that should be launched. This is the first launch, so all tasks need to be launched.

  3. The Scheduler deploys those missing tasks through the Mesos offer cycle using a Deployment Plan to determine the ordering of that deployment.

  4. Once the Scheduler has launched the missing tasks, its current configuration should match the desired configuration defined by the “Service Spec” extracted from svc.yml.

    1. When the current configuration matches the desired configuration, the Scheduler will tell Mesos to suspend sending new offers, as there’s nothing to be done.
    2. The Scheduler idles until it receives an RPC from Mesos notifying it of a task status change, it receives an RPC from an end user against one of its HTTP APIs, or until it is killed by Marathon as the result of a configuration change.

Reconfiguration

This is the process when a configuration update is issued to a running instance of the service.

Steps handled by the DC/OS cluster

  1. The user edits the Scheduler’s configuration either using the Scheduler CLI’s update command or via the DC/OS web interface.
  2. The DC/OS package manager instructs Marathon to kill the current Scheduler and launch a new Scheduler with the updated configuration.

Steps handled by the Scheduler

As with initial install above, at this point the Scheduler is re-launched with the same two sources of information it had before:

  • svc.yml template
  • New environment variables

In addition, the Scheduler now has an additional piece:

  • Pre-existing state in ZooKeeper

Scheduler reconfiguration is slightly different from initial deployment because the Scheduler is now comparing its current state to a non-empty prior state and determining what needs to be changed.

  1. After the Scheduler has rendered its svc.yml against the new environment variables, it has two Service Specs, reflecting two different configurations.
    1. The Service Spec that was just rendered, reflecting the configuration change.
    2. The prior Service Spec (or “Target Configuration”) that was previously stored in ZooKeeper.
  2. The Scheduler automatically compares the changes between the old and new Service Specs.
    1. Change validation: Certain changes, such as editing volumes and scale-down, are not currently supported.
      • If an invalid change is detected, the Scheduler will send an error message and refuse to proceed until the user has reverted the change by relaunching the Scheduler app in Marathon with the prior configuration.
      • If the changes are valid, the new configuration is stored in ZooKeeper as the new Target Configuration and the change deployment proceeds as described below.
    2. Change deployment: The Scheduler produces a diff between the current state and some future state, including all of the Mesos calls (reserve, unreserve, launch, destroy, and so on) needed to get there. For example, if the number of tasks has been increased, then the Scheduler will launch the correct number of new tasks. If a task configuration setting has been changed, the Scheduler will deploy that change to the relevant affected tasks by relaunching them. Tasks that are not affected by the configuration change will be left as-is.

Uninstall

This is the process for uninstalling the DC/OS Apache Kafka service.

Steps handled by the cluster

  1. The user uses the DC/OS CLI’s dcos package uninstall command to uninstall the service.
  2. The DC/OS package manager instructs Marathon to kill the current Scheduler and to launch a new Scheduler with the environment variable SDK_UNINSTALL set to “true”.

Steps handled by the Scheduler

WARNING: Any data stored in reserved disk resources will be irretrievably lost.

When started in uninstall mode, the Scheduler performs the following actions:

  • All running service tasks are killed.
  • Any Mesos resource reservations are unreserved.
  • The pre-existing state in ZooKeeper is deleted. The znode dcos-service-<svc-name> will be left behind. This is due to the structure of the ACLs on the root (/) of the DC/OS ZooKeeper.

Offer Cycle

The Offer Cycle is a core Mesos concept and often a source of confusion when running services on Mesos.

Mesos will periodically notify subscribed Schedulers of resources in the cluster. Schedulers are expected to either accept the offered resources or decline them. In this structure, Schedulers never have a complete picture of the cluster, they only know what is being explicitly offered to them at any given time. This allows Mesos the option of only advertising certain resources to specific Schedulers, without requiring any changes on the Scheduler’s end, but it also means that the Scheduler cannot deterministically know whether it has seen everything that is available in the cluster.

The service scheduler performs the following operations as offers are received from Mesos:

  1. Task Reconciliation: Mesos is the source of truth for what is running on the cluster. Task Reconciliation allows Mesos to convey the status of all tasks being managed by the service. The Scheduler will request a Task Reconciliation during initial startup, and Mesos will then send the current status of that Scheduler’s tasks. This allows the Scheduler to catch up with any potential status changes to its tasks that occurred after the Scheduler was last running. A common pattern in Mesos is to reserve most information about tasks, so this only contains status information, not general task information. The Scheduler keeps its own copy of what it knows about tasks in ZooKeeper. During an initial deployment this process is very fast as no tasks have been launched yet.
  2. Offer Acceptance: Once the Scheduler has finished Task Reconciliation, it will start evaluating the resource offers it receives to determine if any match the requirements of the next task(s) to be launched. At this point, users on small clusters may find that the Scheduler isn’t launching tasks. This is generally because the Scheduler isn’t able to find offered machines with enough room to fit the tasks. To fix this, add more/bigger machines to the cluster, or reduce the requirements of the service.
  3. Resource Cleanup: The Offers provided by Mesos include reservation information if those resources were previously reserved by the Scheduler. The Scheduler will automatically request that any unrecognized but reserved resources be automatically unreserved. This can come up in a few situations; for example, if an agent machine was unavailable for several days and then came back, its resources may still be considered reserved by Mesos as reserved by the service, while the Scheduler does not. At this point, the Scheduler will automatically clean up those resources.

The service scheduler declines all offers that do not match what it is currently trying to launch “forever”. In the case that the workset of the scheduler changes, it will revive offers from Mesos. Mesos will also send any “novel” offers to the scheduler, such as when a different service is removed and new resources become available on an agent.

Permanent and temporary recovery

There are two types of recovery, permanent and temporary. The difference is mainly whether the task being recovered should stay on the same machine, and the side effects that result from that.

  • Temporary recovery:
    • Temporary recovery is triggered when there is an error in the task or the host machine.
    • Recovery involves relaunching the task on the same machine as before.
    • Recovery occurs automatically.
    • Any data in the task’s persistent volumes survives the outage.
    • May be manually triggered by a pod restart command.
  • Permanent recovery:
    • Permanent recovery can be requested when the host machine fails permanently or when the host machine is scheduled for downtime.
    • Recovery involves discarding any persistent volumes that the pod had on the host machine.
    • Recovery only occurs in response to a manual pod replace command (or you may build your own tooling to invoke the replace command).

Triggering a permanent recovery is a destructive operation, as it discards any prior persistent volumes for the pod being recovered. This is desirable when the operator knows that the previous machine is not coming back. For safety’s sake, permanent recovery is currently never automatically triggered by the service itself.

Persistent Volumes

Volumes are advertised as resources by Mesos, and Mesos offers multiple types of persistent volumes. The DC/OS Apache Kafka service supports two of these types: ROOT volumes and MOUNT volumes.

  • ROOT volumes:

    • Use a shared filesystem tree.
    • Share I/O with anything else on that filesystem.
    • Are supported by default in new deployments and do not require additional cluster-level configuration.
    • Are allocated exactly the amount of disk space that was requested.
  • MOUNT volumes:

    • Use a dedicated partition.
    • Have dedicated I/O for the partition.
    • Require additional configuration when setting up the DC/OS cluster.
    • Are allocated the entire partition, so allocated space can far exceed what was originally requested. MOUNT volumes cannot be further subdivided between services.

The fact that MOUNT volumes cannot be subdivided between services means that if multiple services are deployed with MOUNT volumes, they can quickly be unable to densely colocate within the cluster unless many MOUNT volumes are created on each agent. Let’s look at the following deployment scenario across three DC/OS agent machines, each with two enabled MOUNT volumes labeled A and B:

Agent 1: A B
Agent 2: A B
Agent 3: A B

Now we install a service X with two nodes that each use one mount volume. The service consumes volume A on agents 1 and 3:

Agent 1: X B
Agent 2: A B
Agent 3: X B

Now a service Y is installed with two nodes that each use two mount volumes. The service consumes volume A and B on agent 2, but then is stuck without being able to deploy anything else:

Agent 1: X B
Agent 2: Y Y
Agent 3: X B

Configuring ROOT vs MOUNT volumes may depend on the service. Some services will support customizing this setting when it is relevant, while others may assume one or the other.

Secrets Enterprise

Enterprise DC/OS provides a secrets store to enable access to sensitive data such as database passwords, private keys, and API tokens. DC/OS manages secure transportation of secret data, access control and authorization, and secure storage of secret content.

Secrets are available only in Enterprise DC/OS 1.10 and later versions. Learn more about the secrets store.

Authorization for Secrets

The path of a secret defines which service IDs can have access to it. You can think of secret paths as namespaces. Only services that are under the same namespace can read the content of the secret.

Secret Service ID Can service access secret?
Secret_Path1 /user Yes
Secret_Path1 /dev1/user Yes
secret-svc/Secret_Path1 /user No
secret-svc/Secret_Path1 /user/dev1 No
secret-svc/Secret_Path1 /secret-svc Yes
secret-svc/Secret_Path1 /secret-svc/dev1 Yes
secret-svc/Secret_Path1 /secret-svc/instance2/dev2 Yes
secret-svc/Secret_Path1 /secret-svc/a/b/c/dev3 Yes
secret-svc/instance1/Secret_Path2 /secret-svc/dev1 No
secret-svc/instance1/Secret_Path2 /secret-svc/instance2/dev3 No
secret-svc/instance1/Secret_Path2 /secret-svc/instance1 Yes
secret-svc/instance1/Secret_Path2 /secret-svc/instance1/dev3 Yes
secret-svc/instance1/Secret_Path2 /secret-svc/instance1/someDir/dev3 Yes
Note: Absolute paths (paths with a leading slash) to secrets are not supported. The file path for a secret must be relative to the sandbox.

Binary Secrets

You can store binary files, like a Kerberos keytab, in the DC/OS secrets store. In DC/OS 1.11 and later, you can create secrets from binary files directly. In DC/OS 1.10 or earlier versions, files must be base64-encoded, as specified in RFC 4648, prior to being stored as secrets.

DC/OS 1.11 and later

To create a secret called mysecret with the binary contents of kerb5.keytab run:

dcos security secrets create --file kerb5.keytab mysecret

DC/OS 1.10 or earlier

To create a secret called mysecret with the binary contents of kerb5.keytab, first encode it using the base64 command line utility. The following example uses BSD base64 (default on Mac OS).

base64 --input krb5.keytab > kerb5.keytab.base64-encoded

Alternatively, GNU base64 (the default on Linux) inserts line-feeds in the encoded data by default. Disable line-wrapping with the -w 0 argument.

base64 -w 0 krb5.keytab > kerb5.keytab.base64-encoded

Now that the file is encoded it can be stored as a secret.

dcos security secrets create --text-file kerb5.keytab.base64-encoded some/path/__dcos_base64__mysecret

IMPORTANT: The secret name must be prefixed with "__dcos_base64__".

When the some/path/__dcos_base64__mysecret secret is referenced in your service definition, its base64-decoded contents will be made available as a temporary file in your service task containers.

NOTE: Make sure to only refer to binary secrets as files, since holding binary content in environment variables is discouraged.

Service Scheduler Metrics

The service scheduler records a number of metrics that can be used to diagnose issues with the scheduler and monitor the performance of the scheduler. The metrics can be consumed via DC/OS metrics, or pulled directly from the service scheduler.

JSON

A JSON representation of the metrics is available at the /v1/metrics endpoint of the service scheduler.

{
	"version": "3.1.3",
	"gauges": {},
	"counters": {
		"declines.long": {
			"count": 15
		},
		"offers.processed": {
			"count": 18
		},
		"offers.received": {
			"count": 18
		},
		"operation.create": {
			"count": 5
		},
		"operation.launch_group": {
			"count": 3
		},
		"operation.reserve": {
			"count": 20
		},
		"revives": {
			"count": 3
		},
		"task_status.task_running": {
			"count": 6
		}
	},
	"histograms": {},
	"meters": {},
	"timers": {
		"offers.process": {
			"count": 10,
			"max": 0.684745927,
			"mean": 0.15145255818999337,
			"min": 5.367950000000001E-4,
			"p50": 0.0035879090000000002,
			"p75": 0.40317217800000005,
			"p95": 0.684745927,
			"p98": 0.684745927,
			"p99": 0.684745927,
			"p999": 0.684745927,
			"stddev": 0.24017017290826104,
			"m15_rate": 0.5944843686231079,
			"m1_rate": 0.5250565015924039,
			"m5_rate": 0.583689104996544,
			"mean_rate": 0.3809369986002824,
			"duration_units": "seconds",
			"rate_units": "calls/second"
		}
	}
}

Prometheus

A Prometheus representation of the metrics is available at the /v1/metrics/prometheus endpoint of the service scheduler.

# HELP declines_long Generated from Dropwizard metric import (metric=declines.long, type=com.codahale.metrics.Counter)
# TYPE declines_long gauge
declines_long 20.0
# HELP offers_processed Generated from Dropwizard metric import (metric=offers.processed, type=com.codahale.metrics.Counter)
# TYPE offers_processed gauge
offers_processed 24.0
# HELP offers_received Generated from Dropwizard metric import (metric=offers.received, type=com.codahale.metrics.Counter)
# TYPE offers_received gauge
offers_received 24.0
# HELP operation_create Generated from Dropwizard metric import (metric=operation.create, type=com.codahale.metrics.Counter)
# TYPE operation_create gauge
operation_create 5.0
# HELP operation_launch_group Generated from Dropwizard metric import (metric=operation.launch_group, type=com.codahale.metrics.Counter)
# TYPE operation_launch_group gauge
operation_launch_group 4.0
# HELP operation_reserve Generated from Dropwizard metric import (metric=operation.reserve, type=com.codahale.metrics.Counter)
# TYPE operation_reserve gauge
operation_reserve 20.0
# HELP revives Generated from Dropwizard metric import (metric=revives, type=com.codahale.metrics.Counter)
# TYPE revives gauge
revives 4.0
# HELP task_status_task_finished Generated from Dropwizard metric import (metric=task_status.task_finished, type=com.codahale.metrics.Counter)
# TYPE task_status_task_finished gauge
task_status_task_finished 1.0
# HELP task_status_task_running Generated from Dropwizard metric import (metric=task_status.task_running, type=com.codahale.metrics.Counter)
# TYPE task_status_task_running gauge
task_status_task_running 8.0
# HELP offers_process Generated from Dropwizard metric import (metric=offers.process, type=com.codahale.metrics.Timer)
# TYPE offers_process summary
offers_process{quantile="0.5",} 2.0609500000000002E-4
offers_process{quantile="0.75",} 2.2853200000000001E-4
offers_process{quantile="0.95",} 0.005792643
offers_process{quantile="0.98",} 0.005792643
offers_process{quantile="0.99",} 0.111950848
offers_process{quantile="0.999",} 0.396119612
offers_process_count 244.0

Secure JMX Enterprise

Apache Kafka supports Secure JMX allowing you to remotely manage and monitor the Kafka JRE.

Configuration Options

Option Description
jmx.enabled Enables the secure JMX
jmx.port JMX port
jmx.rmi_port JMX RMI port
jmx.access_file The path to the secret in the Secret Store that has the contents of the access file.
jmx.password_file The path to the secret in the Secret Store that has the contents of the password file.
jmx.key_store The path to the secret in the Secret Store that has the contents of the key store.
jmx.key_store_password_file The path to the secret in the Secret Store that has the contents of the key store password file.
jmx.add_trust_store Enables the user provided trust store.
jmx.trust_store The path to the secret in the Secret Store that has the contents of the trust store.
jmx.trust_store_password_file The path to the secret in the Secret Store that has the contents of the trust store password file.

Read more about using JMX options here.

Configuring JMX with self-signed certificate

  1. Generate a self-signed key store and trust store.
$ keytool -genkey -alias server-cert -keyalg rsa  -dname "CN=kafka.example.com,O=Example Company,C=US"  -keystore keystore.ks -storetype JKS -storepass changeit -keypass changeit
$ keytool -genkey -alias server-cert -keyalg rsa  -dname "CN=kafka.example.com,O=Example Company,C=US"  -keystore truststore.ks -storetype JKS -storepass changeit -keypass changeit
  1. Generate files containing the trust store and key store passwords.
$ cat <<EOF >> trust_store_pass
changeit
EOF
$ cat <<EOF >> key_store_pass
changeit
EOF
  1. Create a JMX access file.
$ cat <<EOF >> access_file
admin readwrite
user  readonly
EOF
  1. Create a JMX password file.
$ cat <<EOF >> password_file
admin  adminpassword
user   userpassword
EOF
  1. Create necessary secrets in DC/OS for JMX.
dcos package install dcos-enterprise-cli --yes
dcos security secrets create -f keystore.ks kafka/keystore
dcos security secrets create -f key_store_pass kafka/keystorepass
dcos security secrets create -f truststore.ks kafka/truststore
dcos security secrets create -f trust_store_pass kafka/truststorepass
dcos security secrets create -f password_file kafka/passwordfile
dcos security secrets create -f access_file kafka/access
  1. Create a custom service configuration options.json with JMX enabled.
{
  "service": {
    "jmx": {
      "enabled": true,
      "port": 31299,
      "rmi_port": 31298,
      "access_file": "kafka/access",
      "password_file": "kafka/passwordfile",
      "key_store": "kafka/keystore",
      "key_store_password_file": "kafka/keystorepass",
      "add_trust_store": true,
      "trust_store": "kafka/truststore",
      "trust_store_password_file": "kafka/truststorepass"
    }
  }
}
  1. Install Apache Kafka with the options file you created.
dcos package install kafka --options="options.json"

Service Health Check

DC/OS Apache Kafka supports service oriented health checks allowing you to monitor your service health in details.

Configuration Options

Option Description
health_check.enabled Enables the health checks
health_check.method “PORT” or “FUNCTIONAL”
health_check.interval The period in seconds to wait after the last health check has completed to start the next check.
health_check.delay An amount of time in seconds to wait before starting the health check attempts.
health_check.timeout An amount of time in seconds to wait for a health check to succeed.
health_check.grace-period An amount of time in seconds after the task is launched during which health check failures are ignored. Once a health check succeeds for the first time, the grace period does not apply anymore. Note that it includes delay seconds, i.e., setting grace_period seconds < delay seconds has no effect.
health_check.max-consecutive-failures It is the maximum consecutive number of failures after which task will be killed.
health_check.health-check-topic-prefix Prefix for the health check topic name. Used when “FUNCTIONAL” health check method is selected.
service.security.kerberos.health_check_primary The Kerberos primary used by Kafka health check if enabled.

Health Check Methods

Port Check

This method will check if the broker port is open. Only the broker on which the health check is running will be checked as each broker will have its own health check.

{
  "service": {
    "name": "kafka",
    "health_check": {
      "enabled": true,
      "method": "PORT",
    }
  }
}

Functional Check

It checks if the broker can send and receive messages from a client. Only the broker on which the health check is running will be checked as each broker will have its own health check. The health checks produces a random message to a user configurable topic and then tries to consume the last produced message. When Kerberos and or Transport Encryption is enabled the health check produces only a single random message to a topic and will attempt to consume that same first message of the topic at each health-check interval.

{
  "service": {
    "name": "kafka",
    "health_check": {
      "enabled": true,
      "method": "FUNCTIONAL",
      "health-check-topic-prefix": "MyHealthCheckTopic"
    }
  }
}