Failures such as host, network, JVM, or application failures can affect the behavior of three types of Spark components:
- DC/OS Apache Spark service
- Batch jobs
- Streaming jobs
DC/OS Apache Spark service
The DC/OS Apache Spark service runs in Marathon and includes the Mesos Cluster Dispatcher and the Spark history server. The Dispatcher manages jobs you submit using dcos spark run
. Job data is persisted to Zookeeper. The Spark history server reads event logs from HDFS. If the service dies, Marathon restarts it, and reloads data from these highly available stores.
Batch jobs
Batch jobs are resilient to executor failures, but not driver failures. The Dispatcher will restart a driver if you submit it with the --supervise
option.
Driver
When the driver fails, executors are terminated, and the entire Spark application fails. If you submitted your job with the --supervise
option, then the Dispatcher restarts the job. For the cases when the submitted job keeps failing, the Dispatcher adds exponential backoff between restarts.
Starting from DC/OS 2.0 the Driver will restart without backoff if the node upon which it was running is draining.
Executors
Batch jobs are resilient to executor failure. Upon failure, cached data, shuffle files, and partially computed RDDs are lost. However, Spark RDDs are fault-tolerant, and Spark will start a new executor to recompute lost data from the original data source, caches, or shuffle files. There is a performance cost as data is recomputed, but an executor failure will not cause a job to fail.
Streaming jobs
Whereas batch jobs run once and can usually be restarted upon failure, streaming jobs often need to run constantly. The application must survive driver failures, often with no data loss.
To experience no data loss, you must run streaming jobs with write-ahead logging (WAL) enabled. The one exception is that, if you’re consuming from Kafka, you can use the Direct Kafka API.
For exactly-once processing semantics, you must use the Direct Kafka API. All other receivers provide at least once semantics.
Failures
There are two types of failures:
- Driver
- Executor
Job features
There are a few variables that affect the reliability of your job:
Reliability features
The two reliability features of a job are data loss and processing semantics. Data loss occurs when the source sends data, but the job fails to process it. Processing semantics describe how many times a received message is processed by the job. It can be either “at least once” or “exactly once”
Data loss
A Spark job loses data when delivered data does not get processed. The following is a list of configurations with increasing data preservation guarantees:
- Unreliable receivers
Unreliable receivers do not acknowledge the data they receive from the source. This means that buffered data in the receiver is lost if the executor fails.
executor failure => data loss driver failure => data loss
- Reliable receivers, unreplicated storage level
This is an unusual configuration. By default, Spark streaming receivers run with a replicated storage level. But if you reduce the storage level to be unreplicated, data stored on the receiver but not yet processed will not survive executor failure.
executor failure => data loss
driver failure => data loss
- Reliable receivers, replicated storage level
This is the default configuration. Data stored in the receiver is replicated, and can thus survive a single executor failure. Driver failures, however, result in all executors failing, and therefore result in data loss.
(single) executor failure => no data loss
driver failure => data loss
- Reliable receivers, write-ahead logging
With write-ahead logging enabled, data stored in the receiver is written to a highly available store such as S3 or HDFS. This means that an application can recover from even a driver failure.
executor failure => no data loss
driver failure => no data loss
- Direct Kafka consumer, no checkpointing
Since Spark 1.3, the Spark + Kafka integration has supported an experimental direct consumer, which does not use traditional receivers. With the direct consumer approach, RDDs read directly from Kafka, rather than buffering data in the receivers.
However, when a driver restarts without checkpointing, the driver starts reading from the latest Kafka offset, rather than where the previous driver left off.
executor failure => no data loss
driver failure => data loss
- Direct Kafka consumer, checkpointing
With checkpointing enabled, Kafka offsets are stored in a reliable store such as HDFS or S3. This means that an application can restart exactly where it left off.
executor failure => no data loss
driver failure => no data loss
Processing semantics
Processing semantics apply to how many times received messages get processed. With Spark streaming jobs, this can be either “at least once” or “exactly-once”.
The semantics below describe how “at least once” or “exactly-once” processing apply to Spark receipt of the data. To provide an end-to-end exactly-once guarantee, you must additionally verify that your output operation provides exactly-once guarantees. More info here.
-
Receivers
at least once
Every Spark streaming consumer, with the exception of the direct Kafka consumer described below, uses receivers. Receivers buffer blocks of data in memory, then write them according to the storage level of the job. After writing out the data, receivers send an acknowledgement to the source so the source knows not to re-send the data.
However, if this acknowledgement fails, or the node fails between writing out the data and sending the acknowledgement, then an inconsistency arises. Spark believes that the data has been received, but the source does not. This results in the source re-sending the data, and it being processed twice.
-
Direct Kafka consumer
exactly-once
The direct Kafka consumer avoids the problem described above by reading directly from Kafka, and storing the offsets itself in the checkpoint directory.
More information here.
Mesos checkpointing
Enabling Mesos checkpointing allows Apache Spark Driver and Executors to tolerate Mesos agent upgrades and unexpected crashes without experiencing any downtime. If checkpointing is enabled, the Mesos agents that are running Apache Spark Executors will write the framework pid, executor process IDs, and status updates to disk. If the Agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted Agent to reconnect to executors that were started by the old instance of the Agent. Enabling checkpointing improves fault tolerance, at the cost of a (usually small) increase in disk I/O.
DC/OS Apache Spark service runs with checkpointing enabled by default allowing Apache Spark Drivers to tolerate agent restarts. To enable checkpointing for Apache Spark Executors configuration property spark.mesos.checkpoint
should be set to true
.
More information about Mesos checkpointing here