5. Distributed training with Horovod
In order to scale out training with PopART across multiple machines we use
Horovod to setup and run collective
operations. There is support for the Broadcast
and AllReduce
collective operations.
The Broadcast
operation is typically run at the start of a training to initialise the weights to have the
same values across the instances. Gradients produced during the backwards pass
will be aggregated and averaged across the instances by running the
AllReduce
operation. This ensures that each rank applies the same gradients
to its weights during the weight update step.
5.1. How to modify a PopART program for distributed training
Import the Horovod PopART extension:
import horovod.popart as hvd
Enable the hostAllReduce
PopART session option:
userOpts = popart.SessionOptions()
# Enable host side AllReduce operations in the graph
userOpts.hostAllReduce = True
Initialise the Horovod runtime:
hvd.init()
Initialise the Horovod DistributedOptimizer
object. The constructor takes
the PopART optimiser, training session and session options objects as arguments.
The DistributedOptimizer
object will add operations to copy gradients into
and out of the IPU and run the Horovod AllReduce
operation:
distributed_optimizer = hvd.DistributedOptimizer(optimizer, training.session, userOpts)
Insert the all reduce operation:
distributed_optimizer.insert_host_allreduce()
Broadcast the initial weights from the rank zero process to the other PopART instances:
hvd.broadcast_weights(training.session, root_rank=0)
5.2. Install
The Horovod PopART extension Python wheel can be found in the Poplar SDK downloaded from https://downloads.graphcore.ai/. System prerequisites for installing the Horovod PopART extension can be found here: Horovod install.
5.3. Configuring and running distributed training
Running distributed training with the Horovod PopART extension can be done in the same way as with other frameworks. For instance, running distributed training across two processes on the same machine can be done with the following command:
$ horovodrun -np 2 -H localhost:2 python train.py
Alternatively we can use the Gloo backend for the collective operations as shown below:
$ horovodrun --gloo -np 2 -H localhost:2 python train.py
Additional documentation on flags that can be passed to horovodrun
can be found here: Horovod documentation.
5.4. Full distributed training example
A small example illustrating how to use Horovod with PopART:
import numpy as np
import os
from collections import namedtuple
# import the PopART Horovod extension
import horovod.popart as hvd
import popart
Session = namedtuple('Session', ['session', 'anchors'])
batch_size = 1
IN_SHAPE = 784
OUT_SHAPE = 10
def create_model():
builder = popart.Builder()
dtype = np.float32
np.random.seed(42)
input_shape = popart.TensorInfo(dtype, [batch_size, IN_SHAPE])
x = builder.addInputTensor(input_shape)
init_weights = np.random.normal(0, 1, [IN_SHAPE, OUT_SHAPE]).astype(dtype)
w = builder.addInitializedInputTensor(init_weights)
init_biases = np.random.normal(0, 1, [OUT_SHAPE]).astype(dtype)
b = builder.addInitializedInputTensor(init_biases)
h = builder.aiOnnx.matmul([x, w])
a = builder.aiOnnx.add([h, b])
output = a
probs = builder.aiOnnx.softmax([output])
label_shape = popart.TensorInfo("INT32", [batch_size])
label = builder.addInputTensor(label_shape)
nll = builder.aiGraphcore.nllloss([output, label])
proto = builder.getModelProto()
return builder, proto, x, label, output, nll
def get_device(simulation=True):
num_ipus = 1
deviceManager = popart.DeviceManager()
if simulation:
print("Creating ipu sim")
ipu_options = {
"compileIPUCode": True,
'numIPUs': num_ipus,
"tilesPerIPU": 1216
}
device = deviceManager.createIpuModelDevice(ipu_options)
if device is None:
raise OSError("Failed to acquire IPU.")
else:
print("Aquiring IPU")
device = deviceManager.acquireAvailableDevice(num_ipus)
if device is None:
raise OSError("Failed to acquire IPU.")
else:
print("Acquired IPU: {}".format(device))
return device
def init_session(proto, loss, dataFlow, userOpts, device):
# Create a session to compile and execute the graph
optimizer = popart.SGD({"defaultLearningRate": (0.1, False)})
session = popart.TrainingSession(fnModel=proto,
loss=loss,
deviceInfo=device,
optimizer=optimizer,
dataFlow=dataFlow,
userOptions=userOpts)
session.prepareDevice()
session.setRandomSeed(42)
# Create buffers to receive results from the execution
anchors = session.initAnchorArrays()
return Session(session, anchors), optimizer
def train():
builder, proto, data_in, labels_in, output, loss = create_model()
batches_per_step = 32
anchor_desc = {
output: popart.AnchorReturnType("All"),
loss: popart.AnchorReturnType("All")
}
dataFlow = popart.DataFlow(batches_per_step, anchor_desc)
userOpts = popart.SessionOptions()
device = get_device()
# Enable host side AllReduce operations in the graph
userOpts.hostAllReduce = True
training, optimizer = init_session(proto, loss, dataFlow, userOpts, device)
if userOpts.hostAllReduce:
hvd.init()
distributed_optimizer = hvd.DistributedOptimizer(
optimizer, training.session, userOpts)
distributed_optimizer.insert_host_allreduce()
# Broadcast weights to all the other processes
hvd.broadcast_weights(training.session, root_rank=0)
training.session.weightsFromHost()
# Synthetic data
data = np.random.normal(size=(batches_per_step, batch_size, 784)).astype(
np.float32)
labels = np.zeros((batches_per_step, batch_size, 1)).astype(np.int32)
num_training_steps = 10
for _ in range(num_training_steps):
stepio = popart.PyStepIO({
data_in: data,
labels_in: labels
}, training.anchors)
training.session.run(stepio)
train()
A more comprehensive example on a real dataset can be found in https://github.com/graphcore/tutorials/tree/sdk-release-2.2/feature_examples/popart/distributed_training/horovod.