10. Distributed training

We support distributed training for two different types of systems:

  1. IPU-POD systems: An IPU-M2000-based system where IPUs in a rack are interconnected by IPU-Links, and IPUs in different racks are interconnected by GW-Links. Distributed training on IPU-PODs uses these links to perform collective operations without host involvement. When using multiple instances (host processes), there may however still be a need for communication over the host network, for example for broadcasting the initial values of variables from the first instance to the others.

  2. IPU-Server systems: A Mk1 PCIe card-based system with IPUs interconnected by IPU-Links. IPUs in distinct IPU-Servers are not directly interconnected. Distributed training on IPU-Servers therefore uses the host network for communication. A collective operation is typically performed in a hierarchical fashion where the IPU-Links are used first for intra-server communication, and then the host network is used for inter-server communication.

We provide distribution strategies that are designed for these two types of systems, and with different implementations of the host communication:

Their main differences can be summarized like this:

Distribution strategy

System

Host communication

IPUMultiReplicaStrategy

IPU-POD

Horovod (OpenMPI)

IPUHorovodStrategy

IPU-Server

Horovod (OpenMPI)

IPUMultiWorkerStrategy

IPU-Server

gRPC

There are some things they have in common:

  • They all perform data-parallel synchronous training using multiple host processes. In this sense they are all similar to the MultiWorkerMirroredStrategy provided in standard TensorFlow.

  • They all broadcast the initial values of variables over the host network (using either Horovod or gRPC as described above).

And these are the main differences:

  • With the IPUMultiReplicaStrategy designed for IPU-POD systems, a collective operation (performed either explicitly by calling a member function like reduce() or implicitly by using an optimizer under the strategy scope) will be performed directly on the IPU by using compiled communications with the GCL library over the IPU-Links and GW-Links. The IPUMultiReplicaStrategy is designed for use with PopDist and PopRun. Please refer to the PopDist and PopRun User Guide for more details.

  • With the two distribution strategies designed for IPU-Server systems, an equivalent collective operation will involve a transfer of the tensor from the IPU to the host for performing the collective communication over the host network (using either Horovod with OpenMPI or gRPC). A local (cross-replica) collective operation can be performed by using the cross_replica_ops.

A distinction should be made between these distribution strategies and the IPUStrategy provided in TensorFlow 2. The IPUStrategy targets a single system with one or more IPUs attached, while the distribution strategies we discuss here target distributed systems like those described above (IPU-PODs or multiple IPU-Servers). Also, unlike the IPUStrategy, these distribution strategies do not currently support the Keras Model.fit() family of APIs, and the use of ipu_compiler.compile() is still required to ensure a single XLA graph is compiled, except when using IPUEstimator or IPUPipelineEstimator which already use it internally.

10.1. Example using IPUMultiWorkerStrategy

This example shows how to use the IPUEstimator with the IPUMultiWorkerStrategyV1 to perform distributed training of a model on the MNIST dataset.

The example is based on the following official tutorial with some modifications for use with the IPU: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_estimator

We highlight the changes needed to convert code using IPUEstimator to support distributed training below.

10.1.1. The input function

In multi-worker training, it is necessary to shard the dataset such that each worker processes distinct portions of the dataset.

When used in a distributed context, the input function is passed an additional argument input_context that can be used to get the current worker index and the total number of workers. We pass this information to the Dataset.shard() function to perform the sharding.

Note that the batch size provided by the input function is the per-worker batch size. The global batch size will be this multiplied by the number of workers.

10.1.2. The model function

The optimiser will automatically divide the loss by the number of workers, so in the model function we should only divide the loss by the local batch size.

We will do some changes to how we update the weights of the model. Instead of using the high-level Optimizer.minimize() function, we will use the Optimizer.compute_gradients() and Optimizer.apply_gradients() separately in order to control their placement. The Optimizer.compute_gradients() call (the backward pass) is placed on the IPU, while the Optimizer.apply_gradients() call (the allreduce of gradients and weight updates) is placed on the host. This is done by using the host_call parameter in IPUEstimatorSpec.

In practice this means that the gradients will be streamed from the IPU to the host as soon as they are computed. The workers will then start reducing the gradients amongst themselves, allowing overlap between the backward pass on the IPUs with the reductions on the hosts. After a gradient is reduced across the workers, the corresponding weight update is also done on the host.

The reduction is done using a ring-based collectives implementation with gRPC as the cross-host communication layer.

One benefit of this approach is that any additional optimiser state (such as momentum) is only needed in host memory, so there is no additional IPU memory consumption when using stateful optimisers with this approach.

10.1.3. Cluster definition

We use the TFConfigClusterResolver which reads the TF_CONFIG environment variable to determine the cluster definition.

There are two components of TF_CONFIG: cluster and task.

  • cluster provides information about the entire cluster, namely the workers and parameter servers in the cluster.

  • task provides information about the current task.

In this example, the task type is worker and the task index is 0. You could run this example with two workers on the same machine (in different terminals) like this:

$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":0}}' python distributed_training_example.py
$ TF_CONFIG='{"cluster":{"worker":["localhost:3737","localhost:3738"]},"task":{"type":"worker","index":1}}' python distributed_training_example.py

10.1.4. Complete example

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import argparse
from tensorflow.python.ipu.config import IPUConfig
import numpy as np

import tensorflow.compat.v1 as tf

from tensorflow.python import ipu

BATCH_SIZE = 64


def input_fn(mode, input_context=None):  # pylint: disable=unused-argument
  train_data, _ = tf.keras.datasets.mnist.load_data()

  def normalise(image, label):
    image = image.astype(np.float32) / 255.0
    image = np.expand_dims(image, axis=-1)
    label = label.astype(np.int32)
    return image, label

  x_train, y_train = normalise(*train_data)

  def generator():
    return zip(x_train, y_train)

  types = (x_train.dtype, y_train.dtype)
  shapes = (x_train.shape[1:], y_train.shape[1:])
  mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)

  mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
      .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
  return mnist_dataset


def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(8, 3, activation="relu"),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(8, activation="relu"),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {"logits": logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.AdamOptimizer()
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
                                                                      logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    predictions = tf.argmax(input=logits, axis=-1)
    eval_metric_ops = {
        "accuracy":
        tf.compat.v1.metrics.accuracy(labels=labels, predictions=predictions),
    }
    return tf.estimator.EstimatorSpec(mode,
                                      loss=loss,
                                      eval_metric_ops=eval_metric_ops)

  variables = model.trainable_variables

  def host_model_fn(*host_gradients):
    # This will allreduce the gradients and update the weights on the host.
    return optimizer.apply_gradients(zip(host_gradients, variables))

  train_op = tf.identity(loss)
  grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
  gradients = [g for (g, _) in grads_and_vars]
  host_call = (host_model_fn, gradients)

  return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
                                            loss=loss,
                                            train_op=train_op,
                                            host_call=host_call)


# Get the cluster configuration from the TF_CONFIG environment variable.
cluster = tf.distribute.cluster_resolver.TFConfigClusterResolver()
# Create strategy that places variables (including momentums) on the host.
strategy = ipu.ipu_multi_worker_strategy.IPUMultiWorkerStrategyV1(
    cluster, variables_on_host=True)

ipu_options = IPUConfig()
ipu_options.auto_select_ipus = 1
ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)

config = ipu.ipu_run_config.RunConfig(
    session_config=tf.ConfigProto(allow_soft_placement=False),
    ipu_run_config=ipu_run_config,
    train_distribute=strategy,
)

parser = argparse.ArgumentParser()
parser.add_argument("--num-steps", type=int, default=10000)
parser.add_argument("--model-dir")
args = parser.parse_args()

classifier = ipu.ipu_estimator.IPUEstimator(
    config=config,
    model_fn=model_fn,
    model_dir=args.model_dir,
)

# Training progress is logged as INFO, so enable that logging level.
tf.logging.set_verbosity(tf.logging.INFO)

tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn,
                                      max_steps=args.num_steps),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))

10.2. Distributed training with Horovod

Distributed training can also be performed using Horovod which is included in the TensorFlow wheel provided by Graphcore.

The class IPUHorovodStrategyV1 can be used in the same manner as the IPUMultiWorkerStrategyV1.

While the IPUMultiWorkerStrategyV1 uses collective operations over gRPC, the IPUHorovodStrategyV1 uses the collective operations provided by Horovod, based on MPI. Horovod also has built-in cluster discovery, so there is no cluster resolver argument that must be provided like there is for the IPUMultiWorkerStrategyV1, and there is no need for starting a tf.distribute.Server.

Apart from these differences, the API and semantics should be the same for the IPUHorovodStrategyV1 and IPUMultiWorkerStrategyV1. In other words, they both provide data parallel distributed training that keeps the variables in sync on the different workers. During variable initialisation the values are broadcast from the root rank to the other ranks, and during training the gradients are all-reduced as a part of the Optimizer.apply_gradients call.

10.3. Launching Horovod training

The mpirun tool can be used to run the distributed training across a cluster. For instance, running distributed training across two processes on the same machine can be done with the following command:

$ mpirun -np 2 -H localhost:2 python distributed_training_horovod_example.py

10.4. Complete Horovod example

Below is a complete example using Horovod, adapted from the example above.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import argparse
from tensorflow.python.ipu.config import IPUConfig
import numpy as np
import tensorflow.compat.v1 as tf
from tensorflow.python import ipu
from tensorflow.python.ipu import horovod as hvd
from tensorflow.python.ipu.horovod import ipu_horovod_strategy

BATCH_SIZE = 64


def input_fn(mode):  # pylint: disable=unused-argument
  train_data, _ = tf.keras.datasets.mnist.load_data()

  def normalise(image, label):
    image = image.astype(np.float32) / 255.0
    image = np.expand_dims(image, axis=-1)
    label = label.astype(np.int32)
    return image, label

  x_train, y_train = normalise(*train_data)

  def generator():
    return zip(x_train, y_train)

  types = (x_train.dtype, y_train.dtype)
  shapes = (x_train.shape[1:], y_train.shape[1:])
  mnist_dataset = tf.data.Dataset.from_generator(generator, types, shapes)
  mnist_dataset = mnist_dataset.shard(hvd.size(), hvd.rank())
  mnist_dataset = mnist_dataset.shuffle(len(y_train)) \
      .cache().batch(BATCH_SIZE, drop_remainder=True).repeat()
  return mnist_dataset


def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(8, 3, activation="relu"),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(8, activation="relu"),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {"logits": logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.AdamOptimizer()
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels,
                                                                      logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)

  variables = model.trainable_variables

  def host_model_fn(*host_gradients):
    # This will allreduce the gradients and update the weights on the host.
    return optimizer.apply_gradients(zip(host_gradients, variables))

  train_op = tf.identity(loss)
  grads_and_vars = optimizer.compute_gradients(loss, var_list=variables)
  gradients = [g for (g, _) in grads_and_vars]
  host_call = (host_model_fn, gradients)

  return ipu.ipu_estimator.IPUEstimatorSpec(mode=mode,
                                            loss=loss,
                                            train_op=train_op,
                                            host_call=host_call)


# Initialise the Horovod runtime.
hvd.init()

# Create a Horovod strategy that places variables on the host.
strategy = ipu_horovod_strategy.IPUHorovodStrategyV1(variables_on_host=True)

ipu_options = IPUConfig()
ipu_options.auto_select_ipus = 1
ipu_run_config = ipu.ipu_run_config.IPURunConfig(ipu_options=ipu_options)

config = ipu.ipu_run_config.RunConfig(
    session_config=tf.ConfigProto(allow_soft_placement=False),
    ipu_run_config=ipu_run_config,
    train_distribute=strategy,
)

parser = argparse.ArgumentParser()
parser.add_argument("--num-steps", type=int, default=10000)
parser.add_argument("--model-dir")
args = parser.parse_args()

classifier = ipu.ipu_estimator.IPUEstimator(
    config=config,
    model_fn=model_fn,
    model_dir=args.model_dir,
)

# Training progress is logged as INFO, so enable that logging level.
tf.logging.set_verbosity(tf.logging.INFO)
classifier.train(input_fn=input_fn, max_steps=args.num_steps)