4. Efficient data batching

By default PopTorch will process the batch_size which you provided to the poptorch.DataLoader.

When using the other options below, the actual number of samples used per step varies to allow the IPU(s) to process data more efficiently.

However, the effective (mini-)batch size for operations which depend on it (such as batch normalization) will not change. All that changes is how much data is actually sent for a single step.

Note

Failure to use poptorch.DataLoader may result in accidentally changing the effective batch size for operations which depend on it, such as batch normalization.

4.1. poptorch.DataLoader

If you set the DataLoader batch_size to more than 1 then each operation in the model will process that number of elements at any given time.

class poptorch.DataLoader(options, dataset, batch_size=1, shuffle=False, num_workers=0, drop_last=True, persistent_workers=None, **kwargs)

Thin wrapper around the traditional torch.utils.data.DataLoader to abstract away some of the batch sizes calculations.

If this DataLoader is used in a distributed execution environment, it will ensure that each process uses a different subset of the dataset.

__init__(options, dataset, batch_size=1, shuffle=False, num_workers=0, drop_last=True, persistent_workers=None, **kwargs)
Parameters
  • options (poptorch.Options) – Options that will be used to compile and run the model.

  • dataset – The dataset to get the data from.

  • batch_size (int) – This is the batch size in the conventional sense of being the size that runs through an operation in the model at any given time.

  • shuffle (bool) – Whether or not the dataset should be shuffled.

  • num_workers (int) – Number of worker processes to use to read the data.

  • drop_last (bool) – If True and the number of elements in the dataset is not a multiple of the combined batch size then the incomplete batch at the end will be dropped.

  • persistent_workers (bool) – Re-use workers between iterations if True. If None (default): enabled if num_workers > 0, disabled otherwise.

  • kwargs – Other options to pass to the Torch’s DataLoader’s constructor.

property combinedBatchSize

Total number of elements consumed from the dataset for a single execution of the model.

property options

A reference to the options that were used to initialise this DataLoader.

4.2. poptorch.AsynchronousDataAccessor

To reduce host overhead you can offload the data loading process to a separate thread using an AsynchronousDataAccessor. Doing this allows you to reduce the host/IPU communication overhead by using the time that the IPU is running to load the next batch on the CPU. This means that when the IPU is finished executing and returns to host the data will be ready for the IPU to pull in again.

class poptorch.AsynchronousDataAccessor(dataset, buffer_size=3, miss_sleep_time_in_ms=0.1, load_indefinitely=True)

A dataloader which launches the dataloading process on a separate thread to allow for the data to be preprocessed asynchronous on CPU to minimize CPU/IPU transfer time.

This works by loading the data into a ring buffer of shared memory. When the IPU needs another batch it uses the data ready in the in the ring buffer. The memory is shared so will be used inplace and won’t be freed until the next batch is requested. Behind the scenes the worker thread will be filling the unready elements of the ring buffer.

Important

In order to avoid hanging issues related to OpenMP and fork() the AsynchronousDataAccessor uses the spawn start method which means your dataset must be serializable by pickle. For more information see https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

__init__(dataset, buffer_size=3, miss_sleep_time_in_ms=0.1, load_indefinitely=True)
Parameters
  • dataset – The dataset to pull data from, this can be any Python iterable.

  • buffer_size – The size of the ring buffer.

  • miss_sleep_time_in_ms – When the buffer is full how long should we sleep the worker before checking again.

  • load_indefinitely – If True when we hit the end of the dataset we will just loop round again.

terminate()

An override function to kill the worker process manually.

4.2.1. Example

Listing 4.1 Use of AsynchronousDataAccessor
 1    opts = poptorch.Options()
 2    opts.deviceIterations(device_iterations)
 3    opts.replicationFactor(replication_factor)
 4
 5    data = poptorch.DataLoader(opts,
 6                               ExampleDataset(shape=shape, length=num_tensors),
 7                               batch_size=batch_size,
 8                               num_workers=num_workers)
 9
10    loader = poptorch.AsynchronousDataAccessor(data)
11
12    poptorch_model = poptorch.inferenceModel(model, opts)
13
14    for it, (data, _) in enumerate(loader):
15        out = poptorch_model(data)

4.3. poptorch.Options.deviceIterations

If you set deviceIterations() to more than 1 then you are telling PopART to execute that many batches in sequence.

Essentially, it is the equivalent of launching the IPU in a loop over that number of batches. This is efficient because that loop runs on the IPU directly.

4.3.1. Example

Listing 4.2 Use of device iterations and batch size
 1from functools import reduce
 2from operator import mul
 3
 4import sys
 5import torch
 6import poptorch
 7
 8if not poptorch.ipuHardwareIsAvailable():
 9    print("Replicated top level graphs are not supported on the IPU model")
10    sys.exit(0)
11
12
13class ExampleModelWithLoss(torch.nn.Module):
14    def __init__(self, data_shape, num_classes):
15        super().__init__()
16
17        self.fc = torch.nn.Linear(reduce(mul, data_shape), num_classes)
18        self.loss = torch.nn.CrossEntropyLoss()
19
20    def forward(self, x, target=None):
21        reshaped = x.reshape([x.shape[0], -1])
22        fc = self.fc(reshaped)
23
24        if target is not None:
25            return fc, self.loss(fc, target)
26        return fc
27
28
29class ExampleDataset(torch.utils.data.Dataset):
30    def __init__(self, shape, length):
31        self._shape = shape
32        self._length = length
33
34        self._all_data = []
35        self._all_labels = []
36
37        torch.manual_seed(0)
38        for _ in range(length):
39            label = 1 if torch.rand(()) > 0.5 else 0
40            data = torch.rand(self._shape) + label
41            data[0] = -data[0]
42            self._all_data.append(data)
43            self._all_labels.append(label)
44
45    def __len__(self):
46        return self._length
47
48    def __getitem__(self, index):
49        return self._all_data[index], self._all_labels[index]
50
51
52# Set the batch size in the conventional sense of being the size that
53# runs through an operation in the model at any given time
54model_batch_size = 2
55
56# Create a poptorch.Options instance to override default options
57opts = poptorch.Options()
58
59# Run a 100 iteration loop on the IPU, fetching a new batch each time
60opts.deviceIterations(100)
61
62# Set up the DataLoader to load that much data at each iteration
63training_data = poptorch.DataLoader(opts,
64                                    dataset=ExampleDataset(shape=[3, 2],
65                                                           length=10000),
66                                    batch_size=model_batch_size,
67                                    shuffle=True,
68                                    drop_last=True)
69
70model = ExampleModelWithLoss(data_shape=[3, 2], num_classes=2)
71# Wrap the model in a PopTorch training wrapper
72poptorch_model = poptorch.trainingModel(model, options=opts)
73
74# Run over the training data with "batch_size" 200 essentially.
75for batch_number, (data, labels) in enumerate(training_data):
76    # Execute the device with a 100 iteration loop of batchsize 2.
77    # "output" and "loss" will be the respective output and loss of the final
78    # batch (the default AnchorMode).
79
80    output, loss = poptorch_model(data, labels)
81    print(f"{labels[-1]}, {output}, {loss}")

4.4. poptorch.Options.replicationFactor

replicationFactor() will replicate the model over N IPUs to allow automatic data parallelism across many IPUs.

Listing 4.3 Use of replication factor
 1    # Create a poptorch.Options instance to override default options
 2    opts = poptorch.Options()
 3
 4    # Run a 100 iteration loop on the IPU, fetching a new batch each time
 5    opts.deviceIterations(100)
 6
 7    # Duplicate the model over 4 replicas.
 8    opts.replicationFactor(4)
 9
10    training_data = poptorch.DataLoader(opts,
11                                        dataset=ExampleDataset(shape=[3, 2],
12                                                               length=100000),
13                                        batch_size=model_batch_size,
14                                        shuffle=True,
15                                        drop_last=True)
16
17    model = ExampleModelWithLoss(data_shape=[3, 2], num_classes=2)
18    # Wrap the model in a PopTorch training wrapper
19    poptorch_model = poptorch.trainingModel(model, options=opts)
20
21    # Run over the training data with "batch_size" 200 essentially.
22    for batch_number, (data, labels) in enumerate(training_data):
23        # Execute the device with a 100 iteration loop of batchsize 2 across
24        # 4 IPUs. "output" and "loss" will be the respective output and loss of the
25        # final batch of each replica (the default AnchorMode).
26        output, loss = poptorch_model(data, labels)
27        print(f"{labels[-1]}, {output}, {loss}")

4.5. poptorch.Options.Training.gradientAccumulation

You need to use gradientAccumulation() when training with pipelined models because the weights are shared across pipeline batches so gradients will be both updated and used by subsequent batches out of order. Note gradientAccumulation() is only needed by poptorch.PipelinedExecution.

See also

poptorch.Block

Listing 4.4 Use of gradient accumulation
 1    # Create a poptorch.Options instance to override default options
 2    opts = poptorch.Options()
 3
 4    # Run a 100 iteration loop on the IPU, fetching a new batch each time
 5    opts.deviceIterations(400)
 6
 7    # Accumulate the gradient 8 times before applying it.
 8    opts.Training.gradientAccumulation(8)
 9
10    training_data = poptorch.DataLoader(opts,
11                                        dataset=ExampleDataset(shape=[3, 2],
12                                                               length=100000),
13                                        batch_size=model_batch_size,
14                                        shuffle=True,
15                                        drop_last=True)
16
17    # Wrap the model in a PopTorch training wrapper
18    poptorch_model = poptorch.trainingModel(model, options=opts)
19
20    # Run over the training data with "batch_size" 200 essentially.
21    for batch_number, (data, labels) in enumerate(training_data):
22        # Execute the device with a 100 iteration loop of batchsize 2 across
23        # 4 IPUs. "output" and "loss" will be the respective output and loss of the
24        # final batch of each replica (the default AnchorMode).
25        output, loss = poptorch_model(data, labels)
26        print(f"{labels[-1]}, {output}, {loss}")

4.5.1. Example with parallel execution

In the code example below, poptorch.Block introduced in Parallel execution is used to divide up a different model into disjoint subsets of layers. These blocks can be shared among multiple parallel execution strategies.

Listing 4.5 A training model making use of poptorch.Block
 1class Network(nn.Module):
 2    def __init__(self):
 3        super(Network, self).__init__()
 4        self.layer1 = nn.Linear(784, 784)
 5        self.layer2 = nn.Linear(784, 784)
 6        self.layer3 = nn.Linear(784, 128)
 7        self.layer4 = nn.Linear(128, 10)
 8        self.softmax = nn.Softmax(1)
 9
10    def forward(self, x):
11        x = x.view(-1, 784)
12        with poptorch.Block("B1"):
13            x = self.layer1(x)
14        with poptorch.Block("B2"):
15            x = self.layer2(x)
16        with poptorch.Block("B3"):
17            x = self.layer3(x)
18        with poptorch.Block("B4"):
19            x = self.layer4(x)
20            x = self.softmax(x)
21        return x
22
23
24class TrainingModelWithLoss(torch.nn.Module):
25    def __init__(self, model):
26        super().__init__()
27        self.model = model
28        self.loss = torch.nn.CrossEntropyLoss()
29
30    def forward(self, args, loss_inputs=None):
31        output = self.model(args)
32        if loss_inputs is None:
33            return output
34        with poptorch.Block("B4"):
35            loss = self.loss(output, loss_inputs)
36        return output, loss
37
38

You can see the code examples of poptorch.SerialPhasedExecution, poptorch.PipelinedExecution, and poptorch.ShardedExecution below. Espeially a strategy of poptorch.PipelinedExecution is created to assign layers to multiple IPUs as a pipeline. Gradient accumulation is used to push multiple batches through the pipeline allowing IPUs to run in parallel.

Listing 4.6 An example of different parallel execution strategies
 1    training_data, test_data = get_mnist_data(opts)
 2    model = Network()
 3    model_with_loss = TrainingModelWithLoss(model)
 4    model_opts = poptorch.Options().deviceIterations(1)
 5    if opts.strategy == "phased":
 6        strategy = poptorch.SerialPhasedExecution("B1", "B2", "B3", "B4")
 7        strategy.stage("B1").ipu(0)
 8        strategy.stage("B2").ipu(0)
 9        strategy.stage("B3").ipu(0)
10        strategy.stage("B4").ipu(0)
11        model_opts.setExecutionStrategy(strategy)
12    elif opts.strategy == "pipelined":
13        strategy = poptorch.PipelinedExecution("B1", "B2", "B3", "B4")
14        strategy.stage("B1").ipu(0)
15        strategy.stage("B2").ipu(1)
16        strategy.stage("B3").ipu(2)
17        strategy.stage("B4").ipu(3)
18        model_opts.setExecutionStrategy(strategy)
19        model_opts.Training.gradientAccumulation(opts.batches_per_step)
20    else:
21        strategy = poptorch.ShardedExecution("B1", "B2", "B3", "B4")
22        strategy.stage("B1").ipu(0)
23        strategy.stage("B2").ipu(0)
24        strategy.stage("B3").ipu(0)
25        strategy.stage("B4").ipu(0)
26        model_opts.setExecutionStrategy(strategy)
27
28    if opts.offload_opt:
29        model_opts.TensorLocations.setActivationLocation(
30            poptorch.TensorLocationSettings().useOnChipStorage(True))
31        model_opts.TensorLocations.setWeightLocation(
32            poptorch.TensorLocationSettings().useOnChipStorage(True))
33        model_opts.TensorLocations.setAccumulatorLocation(
34            poptorch.TensorLocationSettings().useOnChipStorage(True))
35        model_opts.TensorLocations.setOptimizerLocation(
36            poptorch.TensorLocationSettings().useOnChipStorage(False))
37
38    if opts.profile:
39        model_opts.Popart.set("engineOptions", {
40            "autoReport.all": "true",
41            "autoReport.directory": opts.profile
42        })
43
44    training_model = poptorch.trainingModel(
45        model_with_loss,
46        model_opts,
47        optimizer=optim.AdamW(model.parameters(), lr=opts.lr))
48
49    # run training, on IPU
50    train(training_model, training_data, opts)

Fig. 4.1 shows the pipeline execution for multiple batches on IPUs. There are 4 pipeline stages running on 4 IPUs respectively. Gradient accumulation enables us to keep the same number of pipeline stages, but with a wider pipeline. This helps hide the latency, which is the total time for one item to go through the whole system, as highlighted.

_images/IPU-pipeline.jpg

Fig. 4.1 Pipeline execution with gradient accumulation