curl -L https://downloads.d2iq.com/kaptain/d2iq-tutorials-1.3.0.tar.gz | tar xz
Training MNIST with Spark and Horovod
Introduction
Recognizing handwritten digits based on the MNIST (Modified National Institute of Standards and Technology) data set is the “Hello, World” example of machine learning. Each (anti-aliased) black-and-white image represents a digit from 0 to 9 and fits in a 28×28 pixel bounding box. The problem of recognizing digits from handwriting is, for instance, important to the postal service when automatically reading zip codes from envelopes.
What You Will Learn
You will see how to use Spark to build a simple Keras model to perform the multi-class classification of images provided.
The example in the notebook includes both training a model in the notebook and running a distributed training job on the cluster using Horovod, so you can easily scale up your own models.
Horovod is best when you want to build estimators with Keras or if you want to train on Spark DataFrame
s from pyspark
.
Of course, you can also use Spark’s MLlib if you prefer.
They key is that if you already have a Keras model, you may not want to rewrite it in Spark, but you may still want to leverage the power of distributed training with Spark and Horovod.
For the distributed training job, you have to package the complete trainer code in a Docker image. You can do that with Kaptain SDK, so that you do not have to leave your favorite notebook environment at all! Included are also instructions for local development, in case you prefer that.
What You Need
All you need is this notebook.
If you prefer to create your Docker image locally (i.e. outside of the Kubernetes cluster), you must have a Docker client on your machine and configured to work with your own container registry.
For Kubernetes commands to run outside of the cluster, you need kubectl
.
Prerequisites
All pre-built images already include Spark and Horovod.
To package the trainer in a container image, you need a file (on the cluster) that contains both the code and a file with the resource definition of the job for the Kubernetes cluster:
TRAINER_FILE = "mnist.py"
KUBERNETES_FILE = "sparkapp-mnist.yaml"
%env KUBERNETES_FILE $KUBERNETES_FILE
Define a helper function to capture output from a cell that usually looks like some-resource created
, using %%capture
:
import re
from IPython.utils.capture import CapturedIO
def get_resource(captured_io: CapturedIO) -> str:
"""
Gets a resource name from `kubectl apply -f <configuration.yaml>`.
:param str captured_io: Output captured by using `%%capture` cell magic
:return: Name of the Kubernetes resource
:rtype: str
:raises Exception: if the resource could not be created (e.g. already exists)
"""
out = captured_io.stdout
matches = re.search(r"^(.+)\s+created", out)
if matches is not None:
return matches.group(1)
else:
raise Exception(f"Cannot get resource as its creation failed: {out}")
How to Train the Model in the Notebook
Since you ultimately want to train the model in a distributed fashion (potentially on GPUs), put all the code in a single cell. That way you can save the file and include it in a container image:
%%writefile $TRAINER_FILE
import argparse
import os
import tempfile
import numpy as np
import horovod.spark
import horovod.tensorflow.keras as hvd
import tensorflow as tf
from pyspark.sql import SparkSession
def get_dataset(rank=0, size=1):
with np.load('datasets/mnist.npz', allow_pickle=True) as f:
x_train = f['x_train'][rank::size]
y_train = f['y_train'][rank::size]
x_test = f['x_test'][rank::size]
y_test = f['y_test'][rank::size]
x_train, x_test = x_train / 255.0, x_test / 255.0 # Normalize RGB values to [0, 1]
return (x_train, y_train), (x_test, y_test)
def get_model():
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
return model
def deserialize(model_bytes):
import horovod.tensorflow.keras as hvd
import h5py
import io
bio = io.BytesIO(model_bytes)
with h5py.File(bio, 'a') as f:
return hvd.load_model(f)
def predict_number(model, x_test, image_index):
pred = model.predict(x_test[image_index:image_index + 1])
print(f"Model prediction for index {image_index}: {pred.argmax()}")
def train_hvd(learning_rate, batch_size, epochs):
# 1 - Initialize Horovod
hvd.init()
# 2 - Pin GPUs
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
(x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
model = get_model()
initial_lr=learning_rate * hvd.size()
# 3 - Wrap optimizer
optimizer = hvd.DistributedOptimizer(
# 4- Scale learning rate
tf.optimizers.Adam(lr=initial_lr)
)
model.compile(optimizer=optimizer,
loss='sparse_categorical_crossentropy',
experimental_run_tf_function=False,
metrics=['accuracy'])
callbacks = [
# 5 - Broadcast initial variables
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=3, verbose=1, initial_lr=initial_lr),
]
# 6 - Save checkpoints
ckpt_dir = tempfile.mkdtemp()
ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
if hvd.rank() == 0:
callbacks.append(
tf.keras.callbacks.ModelCheckpoint(ckpt_file, monitor='accuracy', mode='max',
save_best_only=True))
history = model.fit(x_train, y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
if hvd.rank() == 0:
with open(ckpt_file, 'rb') as f:
return history.history, f.read()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Horovod-on-Spark MNIST Training Job")
parser.add_argument(
"--learning_rate",
type=int,
default=0.001,
metavar="N",
help="Learning rate (default: 0.001)",
)
parser.add_argument(
"--batch_size",
type=int,
default=64,
metavar="N",
help="Batch size for training (default: 64)",
)
parser.add_argument(
"--epochs",
type=int,
default=5,
metavar="N",
help="Number of epochs to train (default: 5)",
)
args, _ = parser.parse_known_args()
spark = SparkSession.builder.appName("HorovodOnSpark").getOrCreate()
image_index = 100
(x_train, y_train), (x_test, y_test) = get_dataset()
print(f"Expected prediction for index {image_index}: {y_test[image_index]}")
# Train model with Horovod on Spark
model_bytes = horovod.spark.run(train_hvd, args=(args.learning_rate,
args.batch_size,
args.epochs))[0][1]
model = deserialize(model_bytes)
model.evaluate(x_test, y_test, verbose=2)
predict_number(model, x_test, image_index)
spark.stop()
Writing mnist.py
Several things are worth highlighting:
- Horovod is initialized with
hvd.init()
. - Each GPU is pinned to a single process to avoid resource contention.
- The model’s optimizer must be wrapped in
hvd.DistributedOptimizer
, which delegates gradient computations to the original optimizers (here: Adam) but applies averaged gradients. - The learning rate must be scaled by the number of workers, because the effective batch size is scaled by the number of workers, which an increased learning rate can compensate for in turn.
- Initial variables must be broadcast from rank 0 to all other processes
- Checkpoints must only be created on worker 0 to prevent any corruption.
Horovod is designed to scale on servers with multiple GPUs.
Horovod relies on MPI (Message Passing Interface) and therefore recycles some of its terminology:
- The number of processes is the size.
- The unique process identifier, which runs from
0
tosize - 1
. - The local rank is the unique process identifier within each server.
%env HOROVOD_JOB=$TRAINER_FILE
env: HOROVOD_JOB=mnist.py
To verify the training job, first run it on Spark in local mode:
%env PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python
env: PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python
%%sh
"${SPARK_HOME}/bin/spark-submit" --master "local[1]" "${HOROVOD_JOB}" --epochs=1
Expected prediction for index 100: 6
Running 1 processes (inferred from spark.default.parallelism)...
20/06/18 16:09:21 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
...
[1,0]<stdout>:938/938 - 2s - loss: 0.3325 - accuracy: 0.9046 - val_loss: 0.1697 - val_accuracy: 0.9500 - lr: 0.0010
313/313 - 0s - loss: 0.1697 - accuracy: 0.9500
Model prediction for index 100: 6
...
20/06/18 16:09:30 INFO SparkContext: Successfully stopped SparkContext
This trains the model in the notebook, but does not distribute the procedure. To that end, build-and-push a container image that contains the code and input dataset.
The data set is included, so the tutorial works on air-gapped (i.e. private/offline) clusters. MNIST data sets are typically downloaded on the fly, which would fail in such scenarios. In most realistic cases, the data sets would be available to the cluster as a volume.
How to Create a Docker Image with Kaptain SDK
Kaptain SDK allows you to build, push, and run containerized ML models without leaving Jupyter.
To build and push Docker images from within a notebook, please check out the Build Docker Images with Kaptain SDK.
All you need is the TRAINER_FILE
and access to a container registry.
How to Create a Docker Image Manually
If you are comfortable with Docker (or prefer to use it as a part of your CI/CD setup), you can create a Dockerfile as follows.
You do have to download the TRAINER_FILE
contents and the datasets
directory to your local machine.
The Kubernetes cluster does not have a Docker daemon available to build your image, so you must do it locally.
It uses containerd to run workloads (only) instead.
The Dockerfile looks as follows:
FROM mesosphere/kubeflow:1.3.0-spark-3.0.0-horovod-0.22.0-tensorflow-2.5.0-gpu
ADD mnist.py /
ADD datasets /datasets
WORKDIR /
If GPU support is not needed, you can leave off the -gpu
suffix from the image.
mnist.py
is the trainer code you have to download to your local machine.
Then it is easy to push images to your container registry:
docker build -t <docker_image_name_with_tag> .
docker push <docker_image_name_with_tag>
The image is available as mesosphere/kubeflow:1.3.0-mnist-spark-3.0.0-horovod-0.22.0-tensorflow-2.5.0-gpu
in case you want to skip it for now.
SparkApplication
How to Create a Distributed The KUDO Spark Operator manages Spark applications in a similar way as the PyTorch or TensorFlow operators manage PyTorchJob
s and TFJob
s, respectively.
It exposes a resource called SparkApplication
that you will use to train the model on multiple nodes with Horovod.
SparkApplication
is a custom resource (definition) (CRD) provided by the KUDO Spark operator.
Operators extend Kubernetes by capturing domain-specific knowledge on how to deploy and run an application or service, how to deal with failures, and so on.
The Spark operator controller manages the lifecycle of a SparkApplication
.
KUDO is a toolkit for creating custom Kubernetes operators using YAML instead of writing Go code.
# set this to 0 when running on CPU.
GPUS = 1
IMAGE = "mesosphere/kubeflow:1.3.0-mnist-spark-3.0.0-horovod-0.22.0-tensorflow-2.5.0-gpu"
%env GPUS $GPUS
%env IMAGE $IMAGE
The specification for a distributed SparkApplication
is defined using YAML:
%%bash
cat <<END > $KUBERNETES_FILE
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: horovod-mnist
spec:
type: Python
mode: cluster
pythonVersion: "3"
image: ${IMAGE}
imagePullPolicy: Always
mainApplicationFile: "local:///mnist.py"
sparkVersion: "3.0.0"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
arguments:
- --epochs
- "5"
driver:
env:
- name: PYTHONUNBUFFERED
value: "1"
cores: 1
memory: "2G"
gpu:
name: "nvidia.com/gpu"
quantity: ${GPUS}
labels:
version: 3.0.0
metrics-exposed: "true"
annotations:
sidecar.istio.io/inject: "false"
serviceAccount: default-editor
executor:
cores: 1
instances: 2
memory: "2G"
gpu:
name: "nvidia.com/gpu"
quantity: ${GPUS}
labels:
version: 3.0.0
metrics-exposed: "true"
annotations:
sidecar.istio.io/inject: "false"
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
jmxExporterJar: "/prometheus/jmx_prometheus_javaagent-0.11.0.jar"
port: 8090
END
Writing sparkapp-mnist.yaml
The operator’s user guide explains how to configure the application.
Please note that in spec.mainApplicationFile
the file name /mnist.py
is hard coded and it must match the file in the docker image.
The annotation sidecar.istio.io/inject: "false"
disables Istio on the SparkApplication
level.
This is not needed if Istio is disabled at the namespace level by your administrator.
You can either execute the following commands on your local machine with kubectl
or directly from the notebook.
If you do run these locally, you cannot rely on cell magic, so you have to manually copy-paste the variables’ values wherever you see $SOME_VARIABLE
.
If you execute the following commands on your own machine (and not inside the notebook), you obviously do not need the cell magic %%
either.
In that case, you have to set the user namespace for all subsequent commands:
kubectl config set-context --current --namespace=<insert-namespace>
Please change the namespace to whatever has been set up by your administrator.
Deploy the distributed training job:
%%capture hvd_output --no-stderr
%%sh
kubectl create -f "${KUBERNETES_FILE}"
%env HVD_JOB {get_resource(hvd_output)}
Check the pods are being created according to the specification:
%%sh
kubectl get pods -l sparkoperator.k8s.io/app-name=horovod-mnist
NAME READY STATUS RESTARTS AGE
horovod-mnist-driver 0/1 Completed 0 64s
Wait for the SparkApplication
to complete:
import subprocess, time
for attempt in range(1, 60):
state = subprocess.check_output(
'kubectl get $HVD_JOB -o jsonpath={.status.applicationState.state}', shell=True).decode().strip()
print(f'job state at attempt {attempt}: {state}')
if state == 'FAILED' or state == 'COMPLETED':
break
time.sleep(10)
See the status of the horovod-mnist
SparkApplication
:
%%sh
kubectl describe ${HVD_JOB}
Name: horovod-mnist
...
API Version: sparkoperator.k8s.io/v1beta2
Kind: SparkApplication
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SparkApplicationAdded 75s spark-operator SparkApplication horovod-mnist was added, enqueuing it for submission
Normal SparkApplicationSubmitted 72s spark-operator SparkApplication horovod-mnist was submitted successfully
Normal SparkDriverRunning 70s spark-operator Driver horovod-mnist-driver is running
Normal SparkExecutorPending 65s spark-operator Executor horovod-mnist-1592496574728-exec-1 is pending
Normal SparkExecutorPending 64s spark-operator Executor horovod-mnist-1592496574728-exec-2 is pending
Normal SparkExecutorPending 64s spark-operator Executor horovod-mnist-1592496574728-exec-3 is pending
Normal SparkExecutorPending 64s spark-operator Executor horovod-mnist-1592496574728-exec-4 is pending
Normal SparkExecutorPending 64s spark-operator Executor horovod-mnist-1592496574728-exec-5 is pending
Normal SparkExecutorRunning 63s spark-operator Executor horovod-mnist-1592496574728-exec-1 is running
Normal SparkExecutorRunning 62s spark-operator Executor horovod-mnist-1592496574728-exec-3 is running
Normal SparkExecutorRunning 62s spark-operator Executor horovod-mnist-1592496574728-exec-2 is running
Normal SparkExecutorRunning 61s spark-operator Executor horovod-mnist-1592496574728-exec-4 is running
Normal SparkExecutorRunning 61s spark-operator Executor horovod-mnist-1592496574728-exec-5 is running
Normal SparkDriverCompleted 23s spark-operator Driver horovod-mnist-driver completed
Normal SparkApplicationCompleted 23s spark-operator SparkApplication horovod-mnist completed
Check the model prediction (as before) by looking at the logs of the driver:
%%sh
kubectl logs horovod-mnist-driver | grep 'Model prediction'
Model prediction for index 100: 6
%%sh
kubectl delete ${HVD_JOB}
sparkapplication.sparkoperator.k8s.io "horovod-mnist" deleted