7. Distributed training
This example shows how to use the IPUEstimator
with the
IPUMultiWorkerStrategy
to perform distributed training of
a model on the MNIST dataset.
The example is based on the following official tutorial with some modifications for use with the IPU: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_estimator
We highlight the changes needed to convert code using IPUEstimator
to support distributed training below.
7.1. The input function
In multi-worker training, it is necessary to shard the dataset such that each worker processes distinct portions of the dataset.
When used in a distributed context, the input function is passed an
additional argument input_context
that can be used to get the
current worker index and the total number of workers. We pass this
information to the Dataset.shard()
function to perform the
sharding.
Note that the batch size provided by the input function is the per-worker batch size. The global batch size will be this multiplied by the number of workers.
7.2. The model function
The optimiser will automatically divide the loss by the number of workers, so in the model function we should only divide the loss by the local batch size.
We will do some changes to how we update the weights of the model.
Instead of using the high-level Optimizer.minimize()
function,
we will use the Optimizer.compute_gradients()
and
Optimizer.apply_gradients()
separately in order to control
their placement. The Optimizer.compute_gradients()
call (the
backward pass) is placed on the IPU, while the
Optimizer.apply_gradients()
call (the allreduce of gradients and
weight updates) is placed on the host. This is done by using the
host_call
parameter in IPUEstimatorSpec
.
In practice this means that the gradients will be streamed from the IPU to the host as soon as they are computed. The workers will then start reducing the gradients amongst themselves, allowing overlap between the backward pass on the IPUs with the reductions on the hosts. After a gradient is reduced across the workers, the corresponding weight update is also done on the host.
The reduction is done using a ring-based collectives implementation with gRPC as the cross-host communication layer.
One benefit of this approach is that any additional optimiser state (such as momentum) is only needed in host memory, so there is no additional IPU memory consumption when using stateful optimisers with this approach.
7.3. Cluster definition
We use the TFConfigClusterResolver
which reads the TF_CONFIG
environment variable to determine the cluster definition.
There are two components of TF_CONFIG
: cluster
and task
.
cluster
provides information about the entire cluster, namely the workers and parameter servers in the cluster.task
provides information about the current task.
In this example, the task type
is worker
and the task index
is 0.
You could run this example with two workers on the same machine
(in different terminals) like this:
$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":0}}' python distributed_training_example.py
$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":1}}' python distributed_training_example.py
7.4. Complete example
1import argparse
2import numpy as np
3import tensorflow as tf
4from tensorflow.python import ipu
5
6BATCH_SIZE = 64
7
8
9def input_fn(mode, input_context=None): # pylint: disable=unused-argument
10 train_data, _ = tf.keras.datasets.mnist.load_data()
11
12 def normalise(image, label):
13 image = image.astype(np.float32) / 255.0
14 image = np.expand_dims(image, axis=-1)
15 label = label.astype(np.int32)
16 return image, label
17
18 x_train, y_train = normalise(*train_data)
19
20 def generator():
21 return zip(x_train, y_train)
22
23 types = (x_train.dtype, y_train.dtype)
24 shapes = (x_train.shape[1:], y_train.shape[1:])
25 mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)
26
27 if input_context:
28 mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
29 input_context.input_pipeline_id)
30
31 mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
32 .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
33 return mnist_dataset
34
35
36def model_fn(features, labels, mode):
37 model = tf.keras.Sequential([
38 tf.keras.layers.Conv2D(32, 3, activation="relu"),
39 tf.keras.layers.MaxPooling2D(),
40 tf.keras.layers.Flatten(),
41 tf.keras.layers.Dense(64, activation="relu"),
42 tf.keras.layers.Dense(10)
43 ])
44 logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
45
46 if mode == tf.estimator.ModeKeys.PREDICT:
47 predictions = {"logits": logits}
48 return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
49
50 optimizer = tf.compat.v1.train.AdamOptimizer()
51 loss = tf.keras.losses.SparseCategoricalCrossentropy(
52 from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
53 logits)
54 loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
55 if mode == tf.estimator.ModeKeys.EVAL:
56 predictions = tf.argmax(input=logits, axis=-1)
57 eval_metric_ops = {
58 "accuracy":
59 tf.compat.v1.metrics.accuracy(labels=labels, predictions=predictions),
60 }
61 return tf.estimator.EstimatorSpec(mode,
62 loss=loss,
63 eval_metric_ops=eval_metric_ops)
64
65 variables = model.trainable_variables
66
67 def host_model_fn(*host_gradients):
68 # This will allreduce the gradients and update the weights on the host.
69 return optimizer.apply_gradients(zip(host_gradients, variables))
70
71 train_op = tf.identity(loss)
72 grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
73 gradients = [g for (g, _) in grads_and_vars]
74 host_call = (host_model_fn, gradients)
75
76 return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
77 loss=loss,
78 train_op=train_op,
79 host_call=host_call)
80
81
82# Get the cluster configuration from the TF_CONFIG environment variable.
83cluster = tf.distribute.cluster_resolver.TFConfigClusterResolver()
84# Create strategy that places variables (including momentums) on the host.
85strategy = ipu.ipu_multi_worker_strategy.IPUMultiWorkerStrategy(
86 cluster, variables_on_host=True)
87
88ipu_options = ipu.utils.create_ipu_config()
89ipu.utils.auto_select_ipus(ipu_options, num_ipus=1)
90ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)
91
92config = ipu.ipu_run_config.RunConfig(
93 session_config=tf.ConfigProto(allow_soft_placement=False),
94 ipu_run_config=ipu_run_config,
95 train_distribute=strategy,
96)
97
98parser = argparse.ArgumentParser()
99parser.add_argument("--num-steps", type=int, default=10000)
100parser.add_argument("--model-dir")
101args = parser.parse_args()
102
103classifier = ipu.ipu_estimator.IPUEstimator(
104 config=config,
105 model_fn=model_fn,
106 model_dir=args.model_dir,
107)
108
109# Training progress is logged as INFO, so enable that logging level.
110tf.logging.set_verbosity(tf.logging.INFO)
111
112tf.estimator.train_and_evaluate(
113 classifier,
114 train_spec=tf.estimator.TrainSpec(input_fn=input_fn,
115 max_steps=args.num_steps),
116 eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))
7.5. Distributed training with Horovod
Distributed training can also be performed using Horovod which is included in the TensorFlow wheel provided by Graphcore.
The class
IPUHorovodStrategy
can be used in the same manner as the
IPUMultiWorkerStrategy
.
While the IPUMultiWorkerStrategy
uses collective operations over gRPC, the
IPUHorovodStrategy
uses the collective operations provided by Horovod, based on
MPI. Horovod also has built-in cluster discovery, so there is no cluster resolver
argument that must be provided like there is for the IPUMultiWorkerStrategy
,
and there is no need for starting a tf.distribute.Server
.
Apart from these differences, the API and semantics should be the same for the
IPUHorovodStrategy
and IPUMultiWorkerStrategy
. In other words, they
both provide data parallel distributed training that keeps the variables in sync
on the different workers. During variable initialisation the values are broadcast
from the root rank to the other ranks, and during training the gradients are
all-reduced as a part of the Optimizer.apply_gradients
call.
7.6. Horovod Open MPI dependency
Horovod depends on Open MPI being installed on the system. The Open MPI library is
dynamically loaded when the module tensorflow.python.ipu.horovod
is
imported, and this will fail if Open MPI is not installed. It is recommended to
install the Open MPI version that is provided by your operating system package manager.
7.7. Launching Horovod training
The mpirun
tool can be used to run the distributed training across a cluster.
For instance, running distributed training across two processes on the same machine
can be done with the following command:
$ mpirun -np 2 -H localhost:2 python distributed_training_horovod_example.py
7.8. Complete Horovod example
Below is a complete example using Horovod, adapted from the example above.
1import argparse
2import numpy as np
3import tensorflow as tf
4from tensorflow.python import ipu
5from tensorflow.python.ipu import horovod as hvd
6from tensorflow.python.ipu.horovod import ipu_horovod_strategy
7
8BATCH_SIZE = 64
9
10
11def input_fn(mode): # pylint: disable=unused-argument
12 train_data, _ = tf.keras.datasets.mnist.load_data()
13
14 def normalise(image, label):
15 image = image.astype(np.float32) / 255.0
16 image = np.expand_dims(image, axis=-1)
17 label = label.astype(np.int32)
18 return image, label
19
20 x_train, y_train = normalise(*train_data)
21
22 def generator():
23 return zip(x_train, y_train)
24
25 types = (x_train.dtype, y_train.dtype)
26 shapes = (x_train.shape[1:], y_train.shape[1:])
27 mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)
28 mnist_dataset = mnist_dataset.shard(hvd.size(), hvd.rank())
29 mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
30 .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
31 return mnist_dataset
32
33
34def model_fn(features, labels, mode):
35 model = tf.keras.Sequential([
36 tf.keras.layers.Conv2D(32, 3, activation="relu"),
37 tf.keras.layers.MaxPooling2D(),
38 tf.keras.layers.Flatten(),
39 tf.keras.layers.Dense(64, activation="relu"),
40 tf.keras.layers.Dense(10)
41 ])
42 logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
43
44 if mode == tf.estimator.ModeKeys.PREDICT:
45 predictions = {"logits": logits}
46 return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
47
48 optimizer = tf.compat.v1.train.AdamOptimizer()
49 loss = tf.keras.losses.SparseCategoricalCrossentropy(
50 from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
51 logits)
52 loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
53
54 variables = model.trainable_variables
55
56 def host_model_fn(*host_gradients):
57 # This will allreduce the gradients and update the weights on the host.
58 return optimizer.apply_gradients(zip(host_gradients, variables))
59
60 train_op = tf.identity(loss)
61 grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
62 gradients = [g for (g, _) in grads_and_vars]
63 host_call = (host_model_fn, gradients)
64
65 return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
66 loss=loss,
67 train_op=train_op,
68 host_call=host_call)
69
70
71# Initialise the Horovod runtime.
72hvd.init()
73
74# Create a Horovod strategy that places variables on the host.
75strategy = ipu_horovod_strategy.IPUHorovodStrategy(variables_on_host=True)
76
77ipu_options = ipu.utils.create_ipu_config()
78ipu.utils.auto_select_ipus(ipu_options, num_ipus=1)
79ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)
80
81config = ipu.ipu_run_config.RunConfig(
82 session_config=tf.ConfigProto(allow_soft_placement=False),
83 ipu_run_config=ipu_run_config,
84 train_distribute=strategy,
85)
86
87parser = argparse.ArgumentParser()
88parser.add_argument("--num-steps", type=int, default=10000)
89parser.add_argument("--model-dir")
90args = parser.parse_args()
91
92classifier = ipu.ipu_estimator.IPUEstimator(
93 config=config,
94 model_fn=model_fn,
95 model_dir=args.model_dir,
96)
97
98# Training progress is logged as INFO, so enable that logging level.
99tf.logging.set_verbosity(tf.logging.INFO)
100classifier.train(input_fn=input_fn, max_steps=args.num_steps)