curl -L https://downloads.d2iq.com/kaptain/d2iq-tutorials-1.3.0.tar.gz | tar xz
Kaptain SDK: Training, Tuning, and Deploying
Introduction
To perform distributed training on a cluster’s resources, conduct experiments with multiple parallel trials to obtain the best hyperparameters, and deploying a trained or tuned model typically requires additional steps, such as building a Docker image and providing framework-specific specifications for Kubernetes. This places the burden on each data scientist to learn all the details of all the components.
Instead of doing all the work, using the Kaptain SDK, you can train
, tune
, and deploy
from within a notebook without having to worry about framework specifics, Kubeflow-native SDKs, or even thinking about Kubernetes
What You Will Learn
The Kaptain SDK provides a data science-friendly user experience from within a notebook that hides all the specifics, and focuses on the model as the main abstraction. A model can be trained, tuned, deployed, and tracked.
The example is based on TensorFlow, but it works equally for PyTorch. The original model code can be found in the tutorials MNIST with TensorFlow and MNIST with PyTorch.
The SDK relies on MinIO, an open-source S3-compliant object storage tool, that is already included with your Kaptain installation.
What You Need
All you need is this notebook.
Prerequisites
Before proceeding, check you are using the correct notebook image, that is, TensorFlow is available:
%%sh
pip list | grep tensorflow
tensorflow 2.4.0
tensorflow-datasets 4.1.0
tensorflow-estimator 2.4.0
tensorflow-metadata 0.26.0
Prepare the Training Code and Data Sets
The examples in this tutorial require a trainer code file mnist.py
and a dataset to be present in the current folder.
The code and datasets are already available in the other tutorials and can be reused here.
How to Create a Docker Credentials File and Kubernetes Secret
For the tutorial you will need getpass
to provide a password interactively without it being immediately visible.
It is a standard Python library, so there is no need to install it.
A simple import
will suffice.
Please type in the container registry username by running the next cell:
import json
import getpass
import pathlib
from base64 import b64encode
docker_user = input()
Enter the password for the Docker container registry when prompted by executing the following code:
docker_password = getpass.getpass()
With these details, base64-encode the username and password and create a Docker configuration file as follows:
# Create a folder to store the Docker configuration file
docker_config_folder = pathlib.Path.joinpath(pathlib.Path.home(), ".docker")
docker_config_folder.mkdir(exist_ok=True)
# Write the base64-encoded credentials to the configuration file
docker_credentials = b64encode(f"{docker_user}:{docker_password}".encode()).decode()
config = {"auths": {"https://index.docker.io/v1/": {"auth": docker_credentials}}}
with open(f"{docker_config_folder}/config.json", "w") as outfile:
outfile.write(json.dumps(config))
Adapt the Model Code
To use the Kaptain SDK, you need to add two lines of code to the original model code:
- One right after the model training (here: Keras’
fit
method), to save the trained model to the cluster’s built-in object storage, MinIO. - Another right after the model evaluation (here: Keras’
evaluate
method), to record the metrics of interest.
%%writefile trainer.py
import argparse
import logging
import tensorflow as tf
import tensorflow_datasets as tfds
from kaptain.platform.model_export_util import ModelExportUtil
from kaptain.platform.metadata_util import MetadataUtil
logging.getLogger().setLevel(logging.INFO)
def get_datasets(buffer_size):
datasets, ds_info = tfds.load(name="mnist", data_dir="datasets", download=False, with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets["train"], datasets["test"]
def scale(image, label):
image = tf.cast(image, tf.float32) / 255.0
return image, label
train_dataset = mnist_train.map(scale).cache().shuffle(buffer_size).repeat()
test_dataset = mnist_test.map(scale)
return train_dataset, test_dataset
def compile_model(args):
model = tf.keras.Sequential(
[
tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation="relu"),
tf.keras.layers.Dense(10, activation="softmax"),
]
)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(
learning_rate=args.learning_rate, momentum=args.momentum
),
metrics=["accuracy"],
)
return model
def main():
parser = argparse.ArgumentParser(description="TensorFlow MNIST Trainer")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="Batch size for training (default: 64)",
)
parser.add_argument(
"--buffer-size",
type=int,
default=10000,
metavar="N",
help="Number of training examples to buffer before shuffling" "default: 10000)",
)
parser.add_argument(
"--epochs",
type=int,
default=5,
metavar="N",
help="Number of epochs to train (default: 5)",
)
parser.add_argument(
"--steps",
type=int,
default=10,
metavar="N",
help="Number of batches to train the model on in each epoch (default: 10)",
)
parser.add_argument(
"--learning-rate",
type=float,
default=0.5,
metavar="N",
help="Learning rate (default: 0.5)",
)
parser.add_argument(
"--momentum",
type=float,
default=0.1,
metavar="N",
help="Accelerates SGD in the relevant direction and dampens oscillations (default: 0.1)",
)
args, _ = parser.parse_known_args()
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
global_batch_size = args.batch_size * strategy.num_replicas_in_sync
train_dataset, test_dataset = get_datasets(buffer_size=args.buffer_size)
train_dataset = train_dataset.batch(batch_size=global_batch_size)
test_dataset = test_dataset.batch(batch_size=global_batch_size)
dataset_options = tf.data.Options()
dataset_options.experimental_distribute.auto_shard_policy = (
tf.data.experimental.AutoShardPolicy.DATA
)
train_datasets_sharded = train_dataset.with_options(dataset_options)
test_dataset_sharded = test_dataset.with_options(dataset_options)
with strategy.scope():
model = compile_model(args=args)
# Train the model
model.fit(train_datasets_sharded, epochs=args.epochs, steps_per_epoch=args.steps)
# Save the trained model (to MinIO) with the exporter utility
model.save("mnist")
ModelExportUtil().upload_model("mnist")
eval_loss, eval_acc = model.evaluate(test_dataset_sharded, verbose=0, steps=args.steps)
# Record the evaluation metrics for use with the hyperparameter tuner
MetadataUtil.record_metrics({"loss": eval_loss, "accuracy": eval_acc})
if __name__ == "__main__":
main()
Writing trainer.py
Define the Model
The central abstraction of the Kaptain SDK is a model:
extra_files = ["datasets/mnist"]
base_image = "mesosphere/kubeflow:1.3.0-tensorflow-2.5.0"
# replace with your docker repository with a tag (optional), e.g. "repository/image" or "repository/image:tag"
image_name = "mesosphere/kubeflow:mnist-sdk-example"
# name of the file with additional python packages to install into model image (e.g. "requirements.txt")
requirements = None
from kaptain.model.models import Model
from kaptain.model.frameworks import ModelFramework
model = Model(
id="dev/mnist",
name="MNIST",
description="MNIST Model",
version="0.0.1",
framework=ModelFramework.TENSORFLOW,
framework_version="2.5.0",
main_file="trainer.py",
extra_files=extra_files,
image_name=image_name,
base_image=base_image,
requirements=requirements,
)
The id
is a unique identifier of the model.
The identifier shown indicates it is an MNIST model in development.
The fields member
and description
are for humans: to inform your colleagues and yourself of what the model is about.
version
is the models’ own version, so it is easy to identify models by their iteration.
The framework
and framework_version
are for the time being human-friendly metadata.
Since a Docker image is built in the background when you train
or tune
a Model
instance, a base_image
has to be provided.
The name of the final image image_name
must be provided with or without image tag.
If the tag is omitted, a concatenation of model id
, framework
, and framework_version
is used.
The main_file
specifies the name of file that contains the model code, that is, trainer.py
for the purposes of this tutorial.
To specify additional Python packages required for training or serving, provide the path to your requirements file via the requirements
parameter of the Model
class. Details on the format of the requirements file can be found in the pip official documentation.
More details are available with ?Model
.
Train the Model
Training the model on 2 nodes is as easy as the following function call:
workers = 2
gpus = 0
memory = "5G"
cpu = "1"
model.train(
workers=workers,
cpu=cpu,
memory=memory,
gpus=gpus,
hyperparameters={"--steps": 10, "--epochs": 5},
args={}, # additional command line arguments for the training job.
)
...
10/10 [==============================] - 0s 48ms/step - accuracy: 0.2516 - loss: 2.2895
[I 201214 11:18:01 kubernetes:190] Epoch 2/5
10/10 [==============================] - 0s 48ms/step - accuracy: 0.4219 - loss: 2.1582
[I 201214 11:18:02 kubernetes:190] Epoch 3/5
10/10 [==============================] - 0s 35ms/step - accuracy: 0.5633 - loss: 1.9389
[I 201214 11:18:02 kubernetes:190] Epoch 4/5
10/10 [==============================] - 0s 38ms/step - accuracy: 0.5859 - loss: 1.8862
[I 201214 11:18:03 kubernetes:190] Epoch 5/5
10/10 [==============================] - 0s 42ms/step - accuracy: 0.6461 - loss: 1.8179
[I 201214 11:18:06 kubernetes:190] INFO:root:Record metrics.
[I 201214 11:18:06 kubernetes:190] INFO:root:Metrics saved.
[I 201214 11:18:07 job_runner:58] Waiting for the training job to complete...
[I 201214 11:18:07 models:332] Training result: Succeeded
The default gpus
argument is 0, but it is shown here as an explicit option.
Use ?Model.train
to see all supported arguments.
The low accuracy of the model is to make the demonstration of distributed training quicker, as in the next section the model’s hyperparameters are optimized anyway.
Verify the Model is Exported to MinIO
%%sh
set -o errexit
minio_accesskey=$(kubectl get secret minio-creds-secret -o jsonpath="{.data.accesskey}" | base64 --decode)
minio_secretkey=$(kubectl get secret minio-creds-secret -o jsonpath="{.data.secretkey}" | base64 --decode)
mc --no-color alias set minio http://minio.kubeflow ${minio_accesskey} ${minio_secretkey}
mc --no-color ls -r minio/kaptain/models
Deploy the Model
A trained model can be deployed as an auto-scalable inference service with a single call:
model.deploy(cpu="1", memory="2G")
[I 201214 11:36:13 models:506] Deploying model from s3://kaptain/models/dev/mnist/tuned/f9e0dff3211a4bf8968c95519bedb926
[I 201214 11:36:13 kubernetes:36] Reading secrets dev-mnist-secret in namespace demo.
[I 201214 11:36:13 kubernetes:27] Creating secret dev-mnist-secret in namespace demo.
[I 201214 11:36:13 kubernetes:84] Reading secrets dev-mnist-service-account in namespace demo.
[I 201214 11:36:13 kubernetes:75] Creating service account dev-mnist-service-account in namespace demo.
[I 201214 11:36:50 deployer:93] Model dev/mnist deployed successfully. Cluster URL: http://dev-mnist.demo.svc.cluster.local/v1/models/dev-mnist:predict
The SDK will deploy the latest saved trained or tuned model.
It is also possible to specify a custom URI for the model, and GPUs for inference.
If the model has already been deployed before, use replace=True
to hot-swap it.
Tune the Model
Specify the hyperparameters and ranges or discrete values, and then use the tune
method:
trials = 16
parallel_trials = 2
from kaptain.hyperparameter.domains import Double, Discrete
hyperparams = {
"--learning-rate": Double(0.2, 0.8),
"--momentum": Double(0.1, 0.5),
"--epochs": Discrete(["5", "10"]),
"--steps": Discrete(["100"]),
}
model.tune(
trials=trials,
parallel_trials=parallel_trials,
workers=workers,
cpu=cpu,
memory=memory,
gpus=gpus,
hyperparameters=hyperparams,
objectives=["accuracy"],
objective_goal=0.99,
args={}, # additional command line arguments to pass to a Trial (TFJob).
)
...
[I 201214 11:27:11 experiment_runner:66] Creating experiment mnist-tune-14c4431d in namespace demo
[I 201214 11:27:11 experiment_runner:68] Experiment mnist-tune-14c4431d has been created.
Progress: 100%|=========================|16/16 [time: 07:15, accuracy: 0.9574999809265137, trials running: 0, pending: 0, failed: 0, killed: 0]
[I 201214 11:34:27 experiment_runner:74] Model tuning completed, final status: Succeeded
[I 201214 11:34:27 kubernetes:66] Deleting secret tune-3cc0353f00e4f27f in namespace demo.
[I 201214 11:34:27 models:429] Experiment results:
parameters: {'--learning-rate': '0.6885679458378395', '--momentum': '0.3915904809621852', '--epochs': '10', '--steps': '100'}, best_trial_name: mnist-tune-14c4431d-wttgxggg
[I 201214 11:34:27 models:432] Copying saved model with the best metrics from the trial to the target location.
[I 201214 11:34:27 models:444] Removing intermediate trial models from the storage.
For more details on the arguments supported by the SDK, execute ?Model.tune
in a notebook.
Available options are:
- a list of objectives with a goal for the primary objective based on the objective type (maximize vs minimize)
- the maximum number of trials, parallel trials, and failed trials
- the hyperparameter tuning algorithm and any custom algorithm settings
The Kaptain SDK allows individual trials to be run in parallel as well as trained in a distributed manner each.
Run canary rollout
To run a canary rollout launch:
model.deploy_canary(canary_traffic_percentage=30)
This will split the traffic between the last deployed revision and the current latest ready revision. It is also possible to specify a custom URI for the model.
To promote the latest canary deployment run:
model.promote_canary()
This will redirect all the traffic to the deployed canary revision.
Test the Model Endpoint
import matplotlib.pyplot as plt
import numpy as np
def display_image(x_test, image_index):
plt.imshow(x_test[image_index].reshape(28, 28), cmap="binary")
with np.load("datasets/mnist.npz", allow_pickle=True) as f:
x_test = f["x_test"] / 255.0
image_index = 1005
display_image(x_test, image_index)
import codecs, json
tf_serving_req = {
"instances": x_test[image_index : image_index + 1].reshape(1, 28, 28, 1).tolist()
}
with open("input.json", "w") as json_file:
json.dump(tf_serving_req, json_file)
%%sh
set -o errexit
model_name="dev-mnist"
namespace=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)
url="http://${model_name}.${namespace}.svc.cluster.local/v1/models/${model_name}:predict"
curl --location \
--silent \
--fail \
--retry 10 \
--retry-delay 10 \
$url \
-d@input.json
{
"predictions": [[2.81844894e-14, 1.56485475e-13, 4.98654347e-11, 1.6658154e-08, 4.37566339e-09, 2.91234592e-10, 3.17106647e-15, 8.52349302e-11, 5.57288715e-10, 1.0]
]
}
The last class has the largest probability; the neural network correctly predicts the image to be a 9.
This tutorial includes code from the MinIO Project (“MinIO”), which is © 2015-2021 MinIO, Inc. MinIO is made available subject to the terms and conditions of the GNU Affero General Public License 3.0. The complete source code for the versions of MinIO packaged with Kaptain 1.3.0 are available at these URLs: https://github.com/minio/minio/tree/RELEASE.2020-12-03T05-49-24Z and https://github.com/minio/minio/tree/RELEASE.2021-07-30T00-02-00Z.