5. Efficient data batching

By default PopTorch will process the batch_size which you provided to the poptorch.DataLoader.

When using the other options below, the actual number of samples used per step varies to allow the IPU(s) to process data more efficiently.

However, the effective (mini-)batch size for operations which depend on it (such as batch normalization) will not change. All that changes is how much data is actually sent for a single step.

Note

Failure to use poptorch.DataLoader may result in accidentally changing the effective batch size for operations which depend on it, such as batch normalization.

You can find a detailed tutorial regarding efficient data loading, batching and tuning relevant hyperparameters in PopTorch on Graphcore’s tutorials repository: PopTorch tutorial: Efficient data loading.

5.1. poptorch.DataLoader

PopTorch provides a thin wrapper around the traditional torch.utils.data.DataLoader to abstract away some of the batch sizes calculations. If poptorch.DataLoader is used in a distributed execution environment, it will ensure that each process uses a different subset of the dataset.

If you set the DataLoader batch_size to more than 1 then each operation in the model will process that number of elements at any given time.

See below for usage example.

5.2. poptorch.AsynchronousDataAccessor

To reduce host overhead you can offload the data loading process to a separate thread by specifying mode=poptorch.DataLoaderMode.Async in the DataLoader constructor. Internally this uses an AsynchronousDataAccessor. Doing this allows you to reduce the host/IPU communication overhead by using the time that the IPU is running to load the next batch on the CPU. This means that when the IPU is finished executing and returns to host the data will be ready for the IPU to pull in again.

Listing 5.1 Use of AsynchronousDataAccessor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    opts = poptorch.Options()
    opts.deviceIterations(device_iterations)
    opts.replicationFactor(replication_factor)

    loader = poptorch.DataLoader(opts,
                                 ExampleDataset(shape=shape,
                                                length=num_tensors),
                                 batch_size=batch_size,
                                 num_workers=num_workers,
                                 mode=poptorch.DataLoaderMode.Async)

    poptorch_model = poptorch.inferenceModel(model, opts)

    for it, (data, _) in enumerate(loader):
        out = poptorch_model(data)

Warning

AsynchronousDataAccessor makes use of the Python multiprocessing module’s spawn start method. Consequently, the entry point of a program that uses it must be guarded by a if __name__ == '__main__': block to avoid endless recursion. The dataset used must also be picklable. For more information, please see https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods.

Warning

Tensors being iterated over using an AsynchronousDataAccessor use shared memory. You must clone tensors at each iteration if you wish to keep their references outside of each iteration.

Consider the following example:

predictions, labels = [], []

for data, label in dataloader:
    predictions += poptorch_model(data)
    labels += label

The predictions list will be correct because it’s producing a new tensor from the inputs. However, The list labels will contain identical references. This line would need to be replaced with the following:

labels += label.detach().clone()

5.2.1. Rebatching iterable datasets

There are two types of datasets in PyTorch : map-style datasets and iterable datasets.

As explained in the notes of PyTorch’s Data Loading Order and Sampler : for IterableDataset : “When fetching from iterable-style datasets with multi-processing, the drop_last argument drops the last non-full batch of each worker’s dataset replica.”

This means that if the number of elements is naively divided among the number of workers (which is the default behaviour) then potentially a significant number of elements will be dropped.

For example:

num_tensors = 100
num_workers = 7
batch_size = 4

per_worker_tensors = ceil(100 / num_workers) = 15
last_worker_tensors = 100 - (num_workers - 1) * per_worker_tensors = 10

num_tensors_used = batch_size * (floor(per_worker_tensors / batch_size) * (num_workers - 1) + floor(last_worker_tensors / batch_size))
                 = 80

This means in this particular case 20% of the dataset will never be used. But, in general the larger the number of workers and the batch size, the more data will end up being unused.

To work around this issue PopTorch has a mode=poptorch.DataLoaderMode.AsyncRebatched. PopTorch will set the batch_size in the PyTorch Dataset and DataLoader to 1 and will instead create the batched tensors in its worker process.

The shape of the tensors returned by the DataLoader will be the same as before, but the number of used tensors from the dataset will increase to floor(num_tensors / batch_size) * batch_size (which means all the tensors would be used in the example above).

Note

This flag is not enabled by default because the behaviour is different from the upstream DataLoader.

5.3. poptorch.Options.deviceIterations

If you set deviceIterations() to more than 1 then you are telling PopART to execute that many batches in sequence.

Essentially, it is the equivalent of launching the IPU in a loop over that number of batches. This is efficient because that loop runs on the IPU directly.

Note that the returned output dimensions depend on poptorch.Options.anchorMode. The default value for trainingModel is Final, since you will often not need to receive all or any of the output tensors and it is more efficient not to receive them. Therefore, only the last batch of data will be returned to the host under this setting. You can change this behaviour by setting the value of poptorch.Options.anchorMode to All. This returns the result of each batch to the host. See poptorch.Options.anchorMode() for more information.

Listing 5.2 Use of device iterations and batch size
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from functools import reduce
from operator import mul

import torch
import poptorch


class ExampleModelWithLoss(torch.nn.Module):
    def __init__(self, data_shape, num_classes):
        super().__init__()

        self.fc = torch.nn.Linear(reduce(mul, data_shape), num_classes)
        self.loss = torch.nn.CrossEntropyLoss()

    def forward(self, x, target=None):
        reshaped = x.reshape([x.shape[0], -1])
        fc = self.fc(reshaped)

        if target is not None:
            return fc, self.loss(fc, target)
        return fc


class ExampleDataset(torch.utils.data.Dataset):
    def __init__(self, shape, length):
        super().__init__()
        self._shape = shape
        self._length = length

        self._all_data = []
        self._all_labels = []

        torch.manual_seed(0)
        for _ in range(length):
            label = 1 if torch.rand(()) > 0.5 else 0
            data = torch.rand(self._shape) + label
            data[0] = -data[0]
            self._all_data.append(data)
            self._all_labels.append(label)

    def __len__(self):
        return self._length

    def __getitem__(self, index):
        return self._all_data[index], self._all_labels[index]


def device_iterations_example():
    # Set the number of samples for which activations/gradients are computed
    # in parallel on a single IPU
    model_batch_size = 2

    # Create a poptorch.Options instance to override default options
    opts = poptorch.Options()

    # Run a 100 iteration loop on the IPU, fetching a new batch each time
    opts.deviceIterations(100)

    # Set up the DataLoader to load that much data at each iteration
    training_data = poptorch.DataLoader(opts,
                                        dataset=ExampleDataset(shape=[3, 2],
                                                               length=10000),
                                        batch_size=model_batch_size,
                                        shuffle=True,
                                        drop_last=True)

    model = ExampleModelWithLoss(data_shape=[3, 2], num_classes=2)
    # Wrap the model in a PopTorch training wrapper
    poptorch_model = poptorch.trainingModel(model, options=opts)

    # Run over the training data, 100 batches at a time (specified in
    # opts.deviceIterations())
    for batch_number, (data, labels) in enumerate(training_data):
        # Execute the device with a 100 iteration loop of batchsize 2.
        # "output" and "loss" will be the respective output and loss of the
        # final batch (the default AnchorMode).

        output, loss = poptorch_model(data, labels)
        print(f"{labels[-1]}, {output}, {loss}")

5.4. poptorch.Options.replicationFactor

replicationFactor() will replicate the model over multiple IPUs to allow automatic data parallelism across many IPUs.

Listing 5.3 Use of replication factor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def replication_factor_example():
    # Set the number of samples for which activations/gradients are computed
    # in parallel on a single IPU
    model_batch_size = 2
    # replication_start
    # Create a poptorch.Options instance to override default options
    opts = poptorch.Options()

    # Run a 100 iteration loop on the IPU, fetching a new batch each time
    opts.deviceIterations(100)

    # Duplicate the model over 4 replicas.
    opts.replicationFactor(4)

    training_data = poptorch.DataLoader(opts,
                                        dataset=ExampleDataset(shape=[3, 2],
                                                               length=100000),
                                        batch_size=model_batch_size,
                                        shuffle=True,
                                        drop_last=True)

    model = ExampleModelWithLoss(data_shape=[3, 2], num_classes=2)
    # Wrap the model in a PopTorch training wrapper
    poptorch_model = poptorch.trainingModel(model, options=opts)

    # Run over the training data, 100 batches at a time (specified in
    # opts.deviceIterations())
    for batch_number, (data, labels) in enumerate(training_data):
        # Execute the device with a 100 iteration loop of model batchsize 2
        # across 4 IPUs (global batchsize = 2 * 4 = 8). "output" and "loss"
        # will be the respective output and loss of the final batch of each
        # replica (the default AnchorMode).
        output, loss = poptorch_model(data, labels)
        print(f"{labels[-1]}, {output}, {loss}")

5.5. poptorch.Options.Training.gradientAccumulation

You need to use gradientAccumulation() when training with pipelined models because the weights are shared across pipeline batches so gradients will be both updated and used by subsequent batches out of order. Note gradientAccumulation() is only needed by poptorch.PipelinedExecution.

See also poptorch.Block.

Listing 5.4 Use of gradient accumulation
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def gradient_accumulation_example():
    # Set the number of samples for which activations/gradients are computed
    # in parallel on a single IPU
    model_batch_size = 2
    # Create a poptorch.Options instance to override default options
    opts = poptorch.Options()

    # Run a 400 iteration loop on the IPU, fetching a new batch each time
    opts.deviceIterations(400)

    # Accumulate the gradient 8 times before applying it.
    opts.Training.gradientAccumulation(8)

    training_data = poptorch.DataLoader(opts,
                                        dataset=ExampleDataset(shape=[3, 2],
                                                               length=100000),
                                        batch_size=model_batch_size,
                                        shuffle=True,
                                        drop_last=True)

    # Wrap the model in a PopTorch training wrapper
    poptorch_model = poptorch.trainingModel(model, options=opts)

    # Run over the training data, 400 batches at a time (specified in
    # opts.deviceIterations())
    for batch_number, (data, labels) in enumerate(training_data):
        # Execute the device with a 100 iteration loop of model batchsize 2
        # with gradient updates every 8 iterations (global batchsize = 2 * 8 = 16).
        # "output" and "loss" will be the respective output and loss of the
        # final batch of each replica (the default AnchorMode).
        output, loss = poptorch_model(data, labels)
        print(f"{labels[-1]}, {output}, {loss}")

In the code example below, poptorch.Block introduced in Section 4.4, Multi-IPU execution strategies is used to divide up a different model into disjoint subsets of layers. These blocks can be shared among multiple parallel execution strategies.

Listing 5.5 A training model making use of poptorch.Block
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        self.layer1 = nn.Linear(784, 784)
        self.layer2 = nn.Linear(784, 784)
        self.layer3 = nn.Linear(784, 128)
        self.layer4 = nn.Linear(128, 10)
        self.softmax = nn.Softmax(1)

    def forward(self, x):
        x = x.view(-1, 784)
        with poptorch.Block("B1"):
            x = self.layer1(x)
        with poptorch.Block("B2"):
            x = self.layer2(x)
        with poptorch.Block("B3"):
            x = self.layer3(x)
        with poptorch.Block("B4"):
            x = self.layer4(x)
            x = self.softmax(x)
        return x


class TrainingModelWithLoss(torch.nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
        self.loss = torch.nn.CrossEntropyLoss()

    def forward(self, args, loss_inputs=None):
        output = self.model(args)
        if loss_inputs is None:
            return output
        with poptorch.Block("B4"):
            loss = self.loss(output, loss_inputs)
        return output, loss


You can see the code examples of poptorch.SerialPhasedExecution, poptorch.PipelinedExecution, and poptorch.ShardedExecution below.

An instance of class poptorch.PipelinedExecution defines an execution strategy that assigns layers to multiple IPUs as a pipeline. Gradient accumulation is used to push multiple batches through the pipeline allowing IPUs to run in parallel.

Listing 5.6 An example of different parallel execution strategies
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    training_data, test_data = get_mnist_data(opts)
    model = Network()
    model_with_loss = TrainingModelWithLoss(model)
    model_opts = poptorch.Options().deviceIterations(1)
    if opts.strategy == "phased":
        strategy = poptorch.SerialPhasedExecution("B1", "B2", "B3", "B4")
        strategy.stage("B1").ipu(0)
        strategy.stage("B2").ipu(0)
        strategy.stage("B3").ipu(0)
        strategy.stage("B4").ipu(0)
        model_opts.setExecutionStrategy(strategy)
    elif opts.strategy == "pipelined":
        strategy = poptorch.PipelinedExecution("B1", "B2", "B3", "B4")
        strategy.stage("B1").ipu(0)
        strategy.stage("B2").ipu(1)
        strategy.stage("B3").ipu(2)
        strategy.stage("B4").ipu(3)
        model_opts.setExecutionStrategy(strategy)
        model_opts.Training.gradientAccumulation(opts.batches_per_step)
    else:
        strategy = poptorch.ShardedExecution("B1", "B2", "B3", "B4")
        strategy.stage("B1").ipu(0)
        strategy.stage("B2").ipu(0)
        strategy.stage("B3").ipu(0)
        strategy.stage("B4").ipu(0)
        model_opts.setExecutionStrategy(strategy)

    if opts.offload_opt:
        model_opts.TensorLocations.setActivationLocation(
            poptorch.TensorLocationSettings().useOnChipStorage(True))
        model_opts.TensorLocations.setWeightLocation(
            poptorch.TensorLocationSettings().useOnChipStorage(True))
        model_opts.TensorLocations.setAccumulatorLocation(
            poptorch.TensorLocationSettings().useOnChipStorage(True))
        model_opts.TensorLocations.setOptimizerLocation(
            poptorch.TensorLocationSettings().useOnChipStorage(False))

    training_model = poptorch.trainingModel(
        model_with_loss,
        model_opts,
        optimizer=optim.AdamW(model.parameters(), lr=opts.lr))

    # run training, on IPU
    train(training_model, training_data, opts)

Fig. 5.1 shows the pipeline execution for multiple batches on IPUs. There are 4 pipeline stages running on 4 IPUs respectively. Gradient accumulation enables us to keep the same number of pipeline stages, but with a wider pipeline. This helps hide the latency, which is the total time for one item to go through the whole system, as highlighted.

_images/IPU-pipeline.jpg

Fig. 5.1 Pipeline execution with gradient accumulation

5.6. poptorch.Options.Training.anchorReturnType

When you use a inferenceModel(), you will usually want to receive all the output tensors. For this reason, PopTorch will return them all to you by default. However, you can change this behaviour using poptorch.Options.anchorMode().

When you use a trainingModel(), you will often not need to receive all or any of the output tensors and it is more efficient not to receive them. For this reason, PopTorch only returns the last batch of tensors by default. As in the the case of inferenceModel, you can change this behaviour using poptorch.Options.anchorMode().

If you want to monitor training using a metric such as loss or accuracy, you may wish to take into account all tensors. To do this with minimal or no overhead, you can use poptorch.AnchorMode.Sum. For example:

Listing 5.7 A model which returns training accuracy as a tensor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class MulticlassPerceptron(torch.nn.Module):
    def __init__(self, vec_length, num_classes):
        super().__init__()
        self.fc = torch.nn.Linear(vec_length, num_classes)
        self.loss = torch.nn.CrossEntropyLoss()

    def forward(self, x, target):
        fc = self.fc(x)

        classification = torch.argmax(fc, dim=-1)
        accuracy = (torch.sum((classification == target).to(torch.float)) /
                    float(classification.numel()))

        if self.training:
            return self.loss(fc, target), accuracy

        return classification, accuracy
Listing 5.8 Efficient calculation of training accuracy across all batches
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
opts = poptorch.Options()

opts.deviceIterations(5)
opts.Training.gradientAccumulation(10)
opts.anchorMode(poptorch.AnchorMode.Sum)

training_data = poptorch.DataLoader(opts,
                                    dataset=ExampleClassDataset(
                                        NUM_CLASSES, VEC_LENGTH, 2000),
                                    batch_size=5,
                                    shuffle=True,
                                    drop_last=True)


model = MulticlassPerceptron(VEC_LENGTH, NUM_CLASSES)
model.train()

# Wrap the model in a PopTorch training wrapper
poptorch_model = poptorch.trainingModel(model,
                                        options=opts,
                                        optimizer=torch.optim.Adam(
                                            model.parameters()))

# Run over the training data, 5 batches at a time.
for batch_number, (data, labels) in enumerate(training_data):
    # Execute the device with a 5 iteration loop of batchsize 5 with 10
    # gradient accumulations (global batchsize = 5 * 10 = 50). "loss" and
    # "accuracy" will be the sum across all device iterations and gradient
    # accumulations but not across the model batch size.
    _, accuracy = poptorch_model(data, labels)

    # Correct for iterations
    # Do not divide by batch here, as this is already accounted for in the
    # PyTorch Model.
    accuracy /= (opts.device_iterations * opts.Training.gradient_accumulation)
    print(f"Accuracy: {float(accuracy)*100:.2f}%")