5. Distributed training with Horovod

In order to scale out training with PopART across multiple machines we use Horovod to setup and run collective operations. There is support for the Broadcast and AllReduce collective operations. The Broadcast operation is typically run at the start of a training to initialise the weights to have the same values across the instances. Gradients produced during the backwards pass will be aggregated and averaged across the instances by running the AllReduce operation. This ensures that each rank applies the same gradients to its weights during the weight update step.

5.1. How to modify a PopART program for distributed training

Import the Horovod PopART extension:

import horovod.popart as hvd

Enable the hostAllReduce PopART session option:

userOpts = popart.SessionOptions()

# Enable host side AllReduce operations in the graph
userOpts.hostAllReduce = True

Initialise the Horovod runtime:

hvd.init()

Initialise the Horovod DistributedOptimizer object. The constructor takes the PopART optimiser, training session and session options objects as arguments. The DistributedOptimizer object will add operations to copy gradients into and out of the IPU and run the Horovod AllReduce operation:

distributed_optimizer = hvd.DistributedOptimizer(optimizer, training.session, userOpts)

Insert the all reduce operation:

distributed_optimizer.insert_host_allreduce()

Broadcast the initial weights from the rank zero process to the other PopART instances:

hvd.broadcast_weights(training.session, root_rank=0)

5.2. Install

The Horovod PopART extension Python wheel can be found in the Poplar SDK downloaded from https://downloads.graphcore.ai/. System prerequisites for installing the Horovod PopART extension can be found here: Horovod install.

5.3. Configuring and running distributed training

Running distributed training with the Horovod PopART extension can be done in the same way as with other frameworks. For instance, running distributed training across two processes on the same machine can be done with the following command:

$ horovodrun -np 2 -H localhost:2 python train.py

Alternatively we can use the Gloo backend for the collective operations as shown below:

$ horovodrun --gloo -np 2 -H localhost:2 python train.py

Additional documentation on flags that can be passed to horovodrun can be found here: Horovod documentation.

5.4. Full distributed training example

A small example illustrating how to use Horovod with PopART:

import numpy as np
import os
from collections import namedtuple

# import the PopART Horovod extension
import horovod.popart as hvd
import popart

Session = namedtuple('Session', ['session', 'anchors'])
batch_size = 1
IN_SHAPE = 784
OUT_SHAPE = 10


def create_model():
    builder = popart.Builder()
    dtype = np.float32

    np.random.seed(42)
    input_shape = popart.TensorInfo(dtype, [batch_size, IN_SHAPE])
    x = builder.addInputTensor(input_shape)
    init_weights = np.random.normal(0, 1, [IN_SHAPE, OUT_SHAPE]).astype(dtype)
    w = builder.addInitializedInputTensor(init_weights)
    init_biases = np.random.normal(0, 1, [OUT_SHAPE]).astype(dtype)
    b = builder.addInitializedInputTensor(init_biases)
    h = builder.aiOnnx.matmul([x, w])
    a = builder.aiOnnx.add([h, b])

    output = a
    probs = builder.aiOnnx.softmax([output])
    label_shape = popart.TensorInfo("INT32", [batch_size])
    label = builder.addInputTensor(label_shape)
    nll = builder.aiGraphcore.nllloss([output, label])

    proto = builder.getModelProto()

    return builder, proto, x, label, output, nll


def get_device(simulation=True):
    num_ipus = 1
    deviceManager = popart.DeviceManager()
    if simulation:
        print("Creating ipu sim")
        ipu_options = {
            "compileIPUCode": True,
            'numIPUs': num_ipus,
            "tilesPerIPU": 1216
        }
        device = deviceManager.createIpuModelDevice(ipu_options)
        if device is None:
            raise OSError("Failed to acquire IPU.")
    else:
        print("Aquiring IPU")
        device = deviceManager.acquireAvailableDevice(num_ipus)
        if device is None:
            raise OSError("Failed to acquire IPU.")
        else:
            print("Acquired IPU: {}".format(device))

    return device


def init_session(proto, loss, dataFlow, userOpts, device):
    # Create a session to compile and execute the graph
    optimizer = popart.SGD({"defaultLearningRate": (0.1, False)})
    session = popart.TrainingSession(fnModel=proto,
                                     loss=loss,
                                     deviceInfo=device,
                                     optimizer=optimizer,
                                     dataFlow=dataFlow,
                                     userOptions=userOpts)

    session.prepareDevice()
    session.setRandomSeed(42)

    # Create buffers to receive results from the execution
    anchors = session.initAnchorArrays()

    return Session(session, anchors), optimizer


def train():
    builder, proto, data_in, labels_in, output, loss = create_model()

    batches_per_step = 32
    anchor_desc = {
        output: popart.AnchorReturnType("All"),
        loss: popart.AnchorReturnType("All")
    }
    dataFlow = popart.DataFlow(batches_per_step, anchor_desc)

    userOpts = popart.SessionOptions()
    device = get_device()

    # Enable host side AllReduce operations in the graph
    userOpts.hostAllReduce = True
    training, optimizer = init_session(proto, loss, dataFlow, userOpts, device)
    if userOpts.hostAllReduce:
        hvd.init()

        distributed_optimizer = hvd.DistributedOptimizer(
            optimizer, training.session, userOpts)
        distributed_optimizer.insert_host_allreduce()

        # Broadcast weights to all the other processes
        hvd.broadcast_weights(training.session, root_rank=0)

    training.session.weightsFromHost()

    # Synthetic data
    data = np.random.normal(size=(batches_per_step, batch_size, 784)).astype(
        np.float32)
    labels = np.zeros((batches_per_step, batch_size, 1)).astype(np.int32)

    num_training_steps = 10

    for _ in range(num_training_steps):
        stepio = popart.PyStepIO({
            data_in: data,
            labels_in: labels
        }, training.anchors)
        training.session.run(stepio)


train()

A more comprehensive example on a real dataset can be found in https://github.com/graphcore/tutorials/tree/sdk-release-2.2/feature_examples/popart/distributed_training/horovod.