7. Distributed training

This example shows how to use the IPUEstimator with the IPUMultiWorkerStrategy to perform distributed training of a model on the MNIST dataset.

The example is based on the following official tutorial with some modifications for use with the IPU: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_estimator

We highlight the changes needed to convert code using IPUEstimator to support distributed training below.

7.1. The input function

In multi-worker training, it is necessary to shard the dataset such that each worker processes distinct portions of the dataset.

When used in a distributed context, the input function is passed an additional argument input_context that can be used to get the current worker index and the total number of workers. We pass this information to the Dataset.shard() function to perform the sharding.

Note that the batch size provided by the input function is the per-worker batch size. The global batch size will be this multiplied by the number of workers.

7.2. The model function

The optimiser will automatically divide the loss by the number of workers, so in the model function we should only divide the loss by the local batch size.

We will do some changes to how we update the weights of the model. Instead of using the high-level Optimizer.minimize() function, we will use the Optimizer.compute_gradients() and Optimizer.apply_gradients() separately in order to control their placement. The Optimizer.compute_gradients() call (the backward pass) is placed on the IPU, while the Optimizer.apply_gradients() call (the allreduce of gradients and weight updates) is placed on the host. This is done by using the host_call parameter in IPUEstimatorSpec.

In practice this means that the gradients will be streamed from the IPU to the host as soon as they are computed. The workers will then start reducing the gradients amongst themselves, allowing overlap between the backward pass on the IPUs with the reductions on the hosts. After a gradient is reduced across the workers, the corresponding weight update is also done on the host.

The reduction is done using a ring-based collectives implementation with gRPC as the cross-host communication layer.

One benefit of this approach is that any additional optimiser state (such as momentum) is only needed in host memory, so there is no additional IPU memory consumption when using stateful optimisers with this approach.

7.3. Cluster definition

We use the TFConfigClusterResolver which reads the TF_CONFIG environment variable to determine the cluster definition.

There are two components of TF_CONFIG: cluster and task.

  • cluster provides information about the entire cluster, namely the workers and parameter servers in the cluster.

  • task provides information about the current task.

In this example, the task type is worker and the task index is 0. You could run this example with two workers on the same machine (in different terminals) like this:

$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":0}}' python distributed_training_example.py
$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":1}}' python distributed_training_example.py

7.4. Complete example

  1import argparse
  2import numpy as np
  3import tensorflow as tf
  4from tensorflow.python import ipu
  5
  6BATCH_SIZE = 64
  7
  8
  9def input_fn(mode, input_context=None):  # pylint: disable=unused-argument
 10  train_data, _ = tf.keras.datasets.mnist.load_data()
 11
 12  def normalise(image, label):
 13    image = image.astype(np.float32) / 255.0
 14    image = np.expand_dims(image, axis=-1)
 15    label = label.astype(np.int32)
 16    return image, label
 17
 18  x_train, y_train = normalise(*train_data)
 19
 20  def generator():
 21    return zip(x_train, y_train)
 22
 23  types = (x_train.dtype, y_train.dtype)
 24  shapes = (x_train.shape[1:], y_train.shape[1:])
 25  mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)
 26
 27  if input_context:
 28    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
 29                                        input_context.input_pipeline_id)
 30
 31  mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
 32      .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
 33  return mnist_dataset
 34
 35
 36def model_fn(features, labels, mode):
 37  model = tf.keras.Sequential([
 38      tf.keras.layers.Conv2D(32, 3, activation="relu"),
 39      tf.keras.layers.MaxPooling2D(),
 40      tf.keras.layers.Flatten(),
 41      tf.keras.layers.Dense(64, activation="relu"),
 42      tf.keras.layers.Dense(10)
 43  ])
 44  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
 45
 46  if mode == tf.estimator.ModeKeys.PREDICT:
 47    predictions = {"logits": logits}
 48    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
 49
 50  optimizer = tf.compat.v1.train.AdamOptimizer()
 51  loss = tf.keras.losses.SparseCategoricalCrossentropy(
 52      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
 53                                                                      logits)
 54  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
 55  if mode == tf.estimator.ModeKeys.EVAL:
 56    predictions = tf.argmax(input=logits, axis=-1)
 57    eval_metric_ops = {
 58        "accuracy":
 59        tf.compat.v1.metrics.accuracy(labels=labels, predictions=predictions),
 60    }
 61    return tf.estimator.EstimatorSpec(mode,
 62                                      loss=loss,
 63                                      eval_metric_ops=eval_metric_ops)
 64
 65  variables = model.trainable_variables
 66
 67  def host_model_fn(*host_gradients):
 68    # This will allreduce the gradients and update the weights on the host.
 69    return optimizer.apply_gradients(zip(host_gradients, variables))
 70
 71  train_op = tf.identity(loss)
 72  grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
 73  gradients = [g for (g, _) in grads_and_vars]
 74  host_call = (host_model_fn, gradients)
 75
 76  return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
 77                                            loss=loss,
 78                                            train_op=train_op,
 79                                            host_call=host_call)
 80
 81
 82# Get the cluster configuration from the TF_CONFIG environment variable.
 83cluster = tf.distribute.cluster_resolver.TFConfigClusterResolver()
 84# Create strategy that places variables (including momentums) on the host.
 85strategy = ipu.ipu_multi_worker_strategy.IPUMultiWorkerStrategy(
 86    cluster, variables_on_host=True)
 87
 88ipu_options = ipu.utils.create_ipu_config()
 89ipu.utils.auto_select_ipus(ipu_options, num_ipus=1)
 90ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)
 91
 92config = ipu.ipu_run_config.RunConfig(
 93    session_config=tf.ConfigProto(allow_soft_placement=False),
 94    ipu_run_config=ipu_run_config,
 95    train_distribute=strategy,
 96)
 97
 98parser = argparse.ArgumentParser()
 99parser.add_argument("--num-steps", type=int, default=10000)
100parser.add_argument("--model-dir")
101args = parser.parse_args()
102
103classifier = ipu.ipu_estimator.IPUEstimator(
104    config=config,
105    model_fn=model_fn,
106    model_dir=args.model_dir,
107)
108
109# Training progress is logged as INFO, so enable that logging level.
110tf.logging.set_verbosity(tf.logging.INFO)
111
112tf.estimator.train_and_evaluate(
113    classifier,
114    train_spec=tf.estimator.TrainSpec(input_fn=input_fn,
115                                      max_steps=args.num_steps),
116    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))

7.5. Distributed training with Horovod

Distributed training can also be performed using Horovod which is included in the TensorFlow wheel provided by Graphcore.

The class IPUHorovodStrategy can be used in the same manner as the IPUMultiWorkerStrategy.

While the IPUMultiWorkerStrategy uses collective operations over gRPC, the IPUHorovodStrategy uses the collective operations provided by Horovod, based on MPI. Horovod also has built-in cluster discovery, so there is no cluster resolver argument that must be provided like there is for the IPUMultiWorkerStrategy, and there is no need for starting a tf.distribute.Server.

Apart from these differences, the API and semantics should be the same for the IPUHorovodStrategy and IPUMultiWorkerStrategy. In other words, they both provide data parallel distributed training that keeps the variables in sync on the different workers. During variable initialisation the values are broadcast from the root rank to the other ranks, and during training the gradients are all-reduced as a part of the Optimizer.apply_gradients call.

7.6. Horovod Open MPI dependency

Horovod depends on Open MPI being installed on the system. The Open MPI library is dynamically loaded when the module tensorflow.python.ipu.horovod is imported, and this will fail if Open MPI is not installed. It is recommended to install the Open MPI version that is provided by your operating system package manager.

7.7. Launching Horovod training

The mpirun tool can be used to run the distributed training across a cluster. For instance, running distributed training across two processes on the same machine can be done with the following command:

$ mpirun -np 2 -H localhost:2 python distributed_training_horovod_example.py

7.8. Complete Horovod example

Below is a complete example using Horovod, adapted from the example above.

  1import argparse
  2import numpy as np
  3import tensorflow as tf
  4from tensorflow.python import ipu
  5from tensorflow.python.ipu import horovod as hvd
  6from tensorflow.python.ipu.horovod import ipu_horovod_strategy
  7
  8BATCH_SIZE = 64
  9
 10
 11def input_fn(mode):  # pylint: disable=unused-argument
 12  train_data, _ = tf.keras.datasets.mnist.load_data()
 13
 14  def normalise(image, label):
 15    image = image.astype(np.float32) / 255.0
 16    image = np.expand_dims(image, axis=-1)
 17    label = label.astype(np.int32)
 18    return image, label
 19
 20  x_train, y_train = normalise(*train_data)
 21
 22  def generator():
 23    return zip(x_train, y_train)
 24
 25  types = (x_train.dtype, y_train.dtype)
 26  shapes = (x_train.shape[1:], y_train.shape[1:])
 27  mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)
 28  mnist_dataset = mnist_dataset.shard(hvd.size(), hvd.rank())
 29  mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
 30      .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
 31  return mnist_dataset
 32
 33
 34def model_fn(features, labels, mode):
 35  model = tf.keras.Sequential([
 36      tf.keras.layers.Conv2D(32, 3, activation="relu"),
 37      tf.keras.layers.MaxPooling2D(),
 38      tf.keras.layers.Flatten(),
 39      tf.keras.layers.Dense(64, activation="relu"),
 40      tf.keras.layers.Dense(10)
 41  ])
 42  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
 43
 44  if mode == tf.estimator.ModeKeys.PREDICT:
 45    predictions = {"logits": logits}
 46    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
 47
 48  optimizer = tf.compat.v1.train.AdamOptimizer()
 49  loss = tf.keras.losses.SparseCategoricalCrossentropy(
 50      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
 51                                                                      logits)
 52  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
 53
 54  variables = model.trainable_variables
 55
 56  def host_model_fn(*host_gradients):
 57    # This will allreduce the gradients and update the weights on the host.
 58    return optimizer.apply_gradients(zip(host_gradients, variables))
 59
 60  train_op = tf.identity(loss)
 61  grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
 62  gradients = [g for (g, _) in grads_and_vars]
 63  host_call = (host_model_fn, gradients)
 64
 65  return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
 66                                            loss=loss,
 67                                            train_op=train_op,
 68                                            host_call=host_call)
 69
 70
 71# Initialise the Horovod runtime.
 72hvd.init()
 73
 74# Create a Horovod strategy that places variables on the host.
 75strategy = ipu_horovod_strategy.IPUHorovodStrategy(variables_on_host=True)
 76
 77ipu_options = ipu.utils.create_ipu_config()
 78ipu.utils.auto_select_ipus(ipu_options, num_ipus=1)
 79ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)
 80
 81config = ipu.ipu_run_config.RunConfig(
 82    session_config=tf.ConfigProto(allow_soft_placement=False),
 83    ipu_run_config=ipu_run_config,
 84    train_distribute=strategy,
 85)
 86
 87parser = argparse.ArgumentParser()
 88parser.add_argument("--num-steps", type=int, default=10000)
 89parser.add_argument("--model-dir")
 90args = parser.parse_args()
 91
 92classifier = ipu.ipu_estimator.IPUEstimator(
 93    config=config,
 94    model_fn=model_fn,
 95    model_dir=args.model_dir,
 96)
 97
 98# Training progress is logged as INFO, so enable that logging level.
 99tf.logging.set_verbosity(tf.logging.INFO)
100classifier.train(input_fn=input_fn, max_steps=args.num_steps)