3. Pipelining

The pipeline approach is similar to sharding.

3.1. Overview

In pipelining, the entire model is partitioned into multiple computing stages, and the output of a stage is the input of the next stage. These stages are executed in parallel on multiple IPUs. Compared to using sharding technology alone, the pipeline approach can maximise the use of all IPUs involved in parallel model processing, which improves processor efficiency as well as throughput and latency performance.

When discussing the use of pipelining, the following nomenclature applies:

  • Mini-batch A set of data samples to be processed in a single forward pass.

  • Batch A set of mini-batches to be processed by the pipeline. The total sample count of which is sometimes referred to as the effective batch size.

Within this context, it suffices to consider a pipeline to be fed mini-batches until a weight update is performed over the set of mini-batches (the batch).

Fig. 3.1 shows how to use pipelining for model parallelism (the dotted-line box indicates the point in the pipeline body where all IPUs are used to the maximum extent). The model consists of four layers and these are divided into four stages. Each stage is assigned to an IPU which computes a layer. When the first IPU receives a mini-batch of data B1 and the first stage is executed, the second IPU starts to execute the second stage and, at the same time, the first IPU receives the next mini-batch of data B2 and starts to execute the first stage, and so on. When the fourth mini-batch of data B4 is read, the parallelism of the four IPUs reaches 100%.

_images/pipeline_time_seq_inference.png

Fig. 3.1 Pipeline time sequence during model inference

The pipeline is relatively simple for inference, but more complicated for training based on backpropagation. For training, pipelining needs to adapt to include the forward pass, backpropagation and weight update.

Fig. 3.2 shows a single computational flow of forward pass and backpropagation, and then shows a complete pipeline with parallel overlapping mini-batches.

Each IPU performs not only the forward computation (Ai) of the corresponding layer, but also the gradient computation (AiGi). The dotted-line box shows the main body of the pipeline (it can be any depth, and larger depth can increase the size of the batch). Through the use of recomputation (see Section 3.5, Optimising the pipeline), the relevant IPU is used to the maximum extent to process forward activations, the previous activations are recomputed from the stored activation inputs, and the gradient updates are computed to save valuable on-chip memory.

_images/pipeline_time_seq_training.png

Fig. 3.2 Pipeline time sequence during model training

The GCD mentioned in the image stands for “graph compile domain”, and is a set of IPUs which the Poplar graph compiler will compile binaries for. With a GCD of size 16, for example, we can generate a model-parallel graph that executes on 16 IPUs.

3.2. Pipeline operation

There are three phases to the pipelined execution:

  • Ramp up: this is the period in which the pipeline is being filled with mini-batches until every pipeline stage (including forward and backward passes) is performing computation. The maximum utilisation is 50%.

  • Main execution: the time when all the pipeline stages are performing computation. This is the period when maximum utilisation is made of all the IPUs.

  • Ramp down: the time when the pipeline is being drained until each pipeline stage is no longer performing any computation. The maximum utilisation is again 50%.

After ramp down, the weight updates are performed.

Note

Pipelining must not be combined with sharding.

3.3. Pipelining API

The pipelining API allows you to define a series of computational stages to support parallel execution on multiple IPUs. Refer to the pipelined training section in Targeting the IPU from TensorFlow 2 for more information.

3.3.1. Inputs and outputs

All tensors which are used in the pipeline that are not TensorFlow variables need to be explicitly passed as inputs to the pipeline. If the input passed in does not change value – for example, hyper-parameters – add them to the inputs argument.

If the input does change value with every execution of a pipeline stage – for example, mini-batches of data – then create an IPUInfeedQueue and pass it to the infeed_queue argument. The inputs list and the infeed_queue are passed as inputs to the first pipeline stage.

After the initial pipeline stage, all the outputs of a pipeline stage N are passed as inputs to the pipeline stage N+1. If an output of a stage N is used by a stage N+M where M > 1, then that output will be passed through the stages in between.

If the last computational stage has any outputs – for example, loss or the prediction – then you will need to create an IPUOutfeedQueue and pass it to the outfeed_queue argument. All the outputs from the final computational stage are passed to the outfeed automatically.

3.3.2. Device mapping

By default, the pipeline stages will be assigned to IPU devices in an order which should maximise the utilisation of IPU-Links between consecutive pipeline stages.

If your model is not sequential you might want to change the assignment, depending on the communication pattern in your model.

Any TensorFlow variables can only be used by pipeline stages which are on the same IPU. You can use the device mapping API to assign pipeline stages which use the same variable to be on the same IPU.

3.3.3. Pipeline scheduling

You can choose the method used for scheduling the operations in the pipeline. The scheduling methods have different trade-offs in terms of memory use, balancing computation between pipeline stages (and therefore the IPUs), and optimisations that can be applied. They will also have different pipeline depths and therefore different ramp-up and ramp-down times. The differences are most significant when training and you may need to experiment to find which method works best for your model.

There are three pipeline schedules:

  • grouped

  • interleaved

  • sequential

In the grouped schedule (Fig. 3.3) the forward and backward stages are grouped together on each IPU. All IPUs alternate between executing a forward pass and then a backward pass.

_images/grouped_schedule.png

Fig. 3.3 Grouped schedule

In the interleaved schedule (Fig. 3.4) each pipeline stage executes a combination of forward and backward passes.

_images/interleaved_schedule.png

Fig. 3.4 Interleaved schedule

Finally, there is the sequential schedule (Fig. 3.5). This is the same as sharding a model: only one mini-batch is ever “in-flight”. This may be useful when you cannot have a big mini-batch size but want to make use of other pipeline features, such as recomputation.

_images/sequential_schedule.png

Fig. 3.5 Sequential schedule

The grouped and interleaved schedules have different advantages and disadvantages for:

  • memory use

  • execution time

  • ramp up and ramp down time

  • inter-IPU optimisations

Memory use

For a pipeline with N stages:

  • The grouped schedule executes 2N mini-batches at any given time.

  • The interleaved schedule executes N mini-batches at any given time.

  • This means that the interleaved schedule requires less memory for the storing the data to be transferred between forward and backward passes.

Execution time

  • The grouped schedule executes all the forward stages together and all the backward stages together.

  • The interleaved schedule executes the forward stages and backward stages interleaved.

  • Due to the synchronisation required between stages, and the fact that the forward stages tend to use fewer cycles than the backward stages, the grouped schedule is likely to be faster.

Ramp-up and ramp-down time

For a pipeline with N stages:

  • The grouped schedule executes 2N mini-batches in total to perform the ramp up and ramp down.

  • The interleaved schedule executes N mini-batches in total to perform the ramp up and ramp down.

Inter-IPU optimisations

  • Some inter-IPU optimisations are not possible with the interleaved schedule. For example, an optimisation which converts variables which are passed through multiple pipeline stages into FIFOs.

3.3.4. Keras API in TensorFlow 2

TensorFlow 2 for the IPU includes the Keras Model and Sequential classes with IPU-specific arguments passed into separate configuration methods. For more details see the TensorFlow API documentation.

3.4. Code examples

This section contains examples of using the pipelining API for inference and training.

3.4.1. Inference code examples

Listing 3.1 shows an example of using pipeline in TensorFlow 1 (download source code).

Listing 3.1 Example of using pipelining for inference (TensorFlow 1)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.ipu import config
 4from tensorflow.python.ipu import ipu_compiler
 5from tensorflow.python.ipu import ipu_infeed_queue
 6from tensorflow.python.ipu import ipu_outfeed_queue
 7from tensorflow.python.ipu.ops import pipelining_ops
 8from tensorflow.python.data.ops.dataset_ops import Dataset
 9from tensorflow.python.ipu import scopes
10from tensorflow.python.ipu import utils
11from tensorflow.python.framework import ops
12from tensorflow.python.ops import variables
13from tensorflow.keras import layers
14import numpy as np
15import tensorflow.compat.v1 as tf
16
17tf.disable_v2_behavior()
18
19# default data_format is 'channels_last'
20dataset = Dataset.from_tensor_slices(
21    np.random.uniform(size=(2, 128, 128, 3)).astype(np.float32)
22)
23dataset = dataset.batch(batch_size=2, drop_remainder=True)
24dataset = dataset.cache()
25dataset = dataset.repeat()
26dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
27
28# Create the data queues from/to IPU.
29infeed_queue = ipu_infeed_queue.IPUInfeedQueue(dataset)
30outfeed_queue = ipu_outfeed_queue.IPUOutfeedQueue()
31
32
33# Create a pipelined model which is split across two stages.
34def stage1(x):
35    x = layers.Conv2D(128, 1)(x)
36    return x
37
38
39def stage2(x):
40    x = layers.Conv2D(128, 1)(x)
41    return x
42
43
44def my_net():
45    pipeline_op = pipelining_ops.pipeline(
46        computational_stages=[stage1, stage2],
47        gradient_accumulation_count=16,
48        repeat_count=2,
49        inputs=[],
50        infeed_queue=infeed_queue,
51        outfeed_queue=outfeed_queue,
52        name="Pipeline",
53    )
54    return pipeline_op
55
56
57with ops.device("/device:IPU:0"):
58    r = ipu_compiler.compile(my_net, inputs=[])
59
60dequeue_op = outfeed_queue.dequeue()
61
62cfg = config.IPUConfig()
63cfg.auto_select_ipus = 2
64cfg.configure_ipu_system()
65utils.move_variable_initialization_to_cpu()
66
67with tf.Session() as sess:
68    sess.run(variables.global_variables_initializer())
69    sess.run(infeed_queue.initializer)
70    sess.run(r)
71    output = sess.run(dequeue_op)

The code first creates a dataset with infeed_queue and outfeed_queue which are for data input and output. The functions stage1() and stage2() define two computation stages. The most important definitions are in my_net() which defines the entire behaviour of the pipeline. Among them:

  • computational_stages indicates that the stage list contains stage1 and stage2

  • gradient_accumulation_count=16 means that each pipeline stage is executed 16 times before the weights are updated

  • repeat_count=2 means that the whole pipeline is executed twice

The program selects two IPUs to perform this task using auto_select_ipus, and each stage is automatically assigned to a single IPU.

The example in Listing 3.2 uses tf.keras.Model in TensorFlow 2 to define a model (download source code).

Listing 3.2 Example of using pipelining for inference (with tf.keras.Model)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.data.ops.dataset_ops import Dataset
 4from tensorflow.python.ipu import utils
 5from tensorflow.keras import ipu
 6from tensorflow.keras import layers
 7from tensorflow.python.ipu import config
 8from tensorflow.python.ipu import ipu_strategy
 9import numpy as np
10import tensorflow as tf
11
12# default data_format is 'channels_last'
13dataset = Dataset.from_tensor_slices(np.random.uniform(size=(2, 128, 128, 3)).astype(np.float32))
14dataset = dataset.batch(batch_size=2, drop_remainder=True)
15dataset = dataset.cache()
16dataset = dataset.repeat()
17dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
18
19
20# Create a pipelined model which is split across two stages.
21def my_model():
22    input_layer = layers.Input(shape=(128, 128, 3), dtype=tf.float32, batch_size=2)
23
24    with ipu.PipelineStage(0):
25        x = layers.Conv2D(128, 1)(input_layer)
26
27    with ipu.PipelineStage(1):
28        x = layers.Conv2D(128, 1)(x)
29
30    return tf.keras.Model(input_layer, x)
31
32
33cfg = config.IPUConfig()
34cfg.auto_select_ipus = 2
35cfg.configure_ipu_system()
36utils.move_variable_initialization_to_cpu()
37
38
39# Define the model under an IPU strategy scope
40strategy = ipu_strategy.IPUStrategy()
41with strategy.scope():
42    model = my_model()
43    model.set_pipelining_options(gradient_accumulation_steps_per_replica=16)
44
45    model.compile(steps_per_execution=10)
46    model.predict(dataset, steps=2)

When defining a model for use with tf.keras.Model, the computational stages are defined by the layers under the PipelineStage scopes. In TensorFlow 2, to ensure that the model will be compiled for the IPUs, we enclose it in an IPUstrategy scope.

Note the use of set_pipelining_options method, which is used to set the gradient_accumulation_steps_per_replica parameter (equivalent to gradient_accumulation_count in the TensorFlow 1 example above). After this, the program calls compile() and predict() methods to run inference on the model.

The steps_per_execution argument helps reduce Python overhead and maximize the performance of your model. For more information, see the instructions on how to use this argument.

Listing 3.3 contains the same model as in Listing 3.2 but is now defined using tf.keras.Sequential (download source code):

Listing 3.3 Example of using pipelining for inference (with with tf.keras.Model)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.data.ops.dataset_ops import Dataset
 4from tensorflow.python.ipu import utils
 5from tensorflow.keras import layers
 6from tensorflow.python.ipu import config
 7from tensorflow.python.ipu import ipu_strategy
 8import numpy as np
 9import tensorflow as tf
10
11# default data_format is 'channels_last'
12dataset = Dataset.from_tensor_slices(np.random.uniform(size=(2, 128, 128, 3)).astype(np.float32))
13dataset = dataset.batch(batch_size=2, drop_remainder=True)
14dataset = dataset.cache()
15dataset = dataset.repeat()
16dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
17
18
19# Create a pipelined model which is split across two stages.
20def my_model():
21    return tf.keras.Sequential([layers.Conv2D(128, 1), layers.Conv2D(128, 1)])
22
23
24cfg = config.IPUConfig()
25cfg.auto_select_ipus = 2
26cfg.configure_ipu_system()
27utils.move_variable_initialization_to_cpu()
28
29
30# Define the model under an IPU strategy scope
31strategy = ipu_strategy.IPUStrategy()
32with strategy.scope():
33    model = my_model()
34
35    model.set_pipeline_stage_assignment([0, 1])
36    model.set_pipelining_options(gradient_accumulation_steps_per_replica=16)
37
38    model.compile(steps_per_execution=10)
39    model.predict(dataset, steps=2)

The only differences from tf.keras.Model is how the model is defined, as well as how the pipeline stages are assigned - using model.set_pipeline_stage_assignment() instead of keras.PipelineStage().

As shown in Listing 3.4, it is possible to use model.set_pipeline_stage_assignment() with tf.keras.Model or any functional model, which is useful for pipelining already existing models (download source code):

Listing 3.4 Example of using pipelining for inference (setting pipeline stage)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.data.ops.dataset_ops import Dataset
 4from tensorflow.python.ipu import utils
 5from tensorflow.keras import layers
 6from tensorflow.python.ipu import config
 7from tensorflow.python.ipu import ipu_strategy
 8import numpy as np
 9import tensorflow as tf
10
11# default data_format is 'channels_last'
12dataset = Dataset.from_tensor_slices(np.random.uniform(size=(2, 128, 128, 3)).astype(np.float32))
13dataset = dataset.batch(batch_size=2, drop_remainder=True)
14dataset = dataset.cache()
15dataset = dataset.repeat()
16dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
17
18
19# Create a pipelined model which is split across two stages.
20def my_model():
21    input_layer = layers.Input(shape=(128, 128, 3), dtype=tf.float32, batch_size=2)
22
23    x = layers.Conv2D(128, 1)(input_layer)
24
25    x = layers.Conv2D(128, 1, name="split")(x)
26
27    x = layers.Conv2D(128, 1)(x)
28
29    x = layers.Conv2D(128, 1)(x)
30
31    return tf.keras.Model(input_layer, x)
32
33
34cfg = config.IPUConfig()
35cfg.auto_select_ipus = 2
36cfg.configure_ipu_system()
37utils.move_variable_initialization_to_cpu()
38
39
40# Define the model under an IPU strategy scope
41strategy = ipu_strategy.IPUStrategy()
42with strategy.scope():
43    model = my_model()
44
45    # Get the individual assignments - note that they are returned in post-order.
46    assignments = model.get_pipeline_stage_assignment()
47
48    # Iterate over them and set their pipeline stages.
49    stage_id = 0
50    for assignment in assignments:
51        assignment.pipeline_stage = stage_id
52        if assignment.layer.name.startswith("split"):
53            stage_id = 1
54
55    # Set the assignments to the model.
56    model.set_pipeline_stage_assignment(assignments)
57    model.set_pipelining_options(gradient_accumulation_steps_per_replica=16)
58
59    model.compile(steps_per_execution=10)
60    model.predict(dataset, steps=2)

This includes two extra steps:

  • Getting the individual assignments of the layers using model.get_pipeline_stage_assignment()

  • Setting the pipeline stages.

You can name your layers using the name parameter and call assignment.layer.name.startswith("NAME") inside the pipeline stage assignment to move to the next pipeline stage.

In Listing 3.4, the first two layers are part of the first pipeline stage and the last two are part of the second pipeline stage.

3.4.2. Training code examples

The example of using pipelining for training in Listing 3.5 (TensorFlow 1) creates a pipeline of four stages with gradient accumulation count of 8 and a repeat count of 2. Four IPUs are selected for computation. The selection order is ZIGZAG, and recomputation is enabled. The loss function is cross-entropy, and the optimiser is tf.train.GradientDescentOptimizer() (download source code).

Listing 3.5 Example of using pipelining for training (TensorFlow 1)
  1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
  2
  3from tensorflow.python.ipu import config
  4from tensorflow.python.ipu import ipu_compiler
  5from tensorflow.python.ipu import ipu_infeed_queue
  6from tensorflow.python.ipu import ipu_outfeed_queue
  7from tensorflow.python.ipu.ops import pipelining_ops
  8from tensorflow.python.ops import variable_scope
  9from tensorflow.python.data.ops.dataset_ops import Dataset
 10from tensorflow.python.ipu import utils
 11from tensorflow.python.framework import ops
 12from tensorflow.python.ops import variables
 13from tensorflow.keras import layers
 14import numpy as np
 15import tensorflow.compat.v1 as tf
 16
 17tf.disable_v2_behavior()
 18
 19# default data_format is 'channels_last'
 20dataset = Dataset.from_tensor_slices(
 21    (
 22        tf.random.uniform([2, 128, 128, 3], dtype=tf.float32),
 23        tf.random.uniform([2], maxval=10, dtype=tf.int32),
 24    )
 25)
 26dataset = dataset.batch(batch_size=2, drop_remainder=True)
 27dataset = dataset.shuffle(1000)
 28dataset = dataset.cache()
 29dataset = dataset.repeat()
 30dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
 31
 32# Create the data queues from/to IPU.
 33infeed_queue = ipu_infeed_queue.IPUInfeedQueue(dataset)
 34outfeed_queue = ipu_outfeed_queue.IPUOutfeedQueue()
 35
 36
 37# Create a pipelined model which is split across four stages.
 38def stage1(x, labels):
 39    with variable_scope.variable_scope("stage1", use_resource=True):
 40        with variable_scope.variable_scope("conv", use_resource=True):
 41            x = layers.Conv2D(3, 1)(x)
 42            return x, labels
 43
 44
 45def stage2(x, labels):
 46    with variable_scope.variable_scope("stage2", use_resource=True):
 47        with variable_scope.variable_scope("conv", use_resource=True):
 48            x = layers.Conv2D(3, 1)(x)
 49            return x, labels
 50
 51
 52def stage3(x, labels):
 53    with variable_scope.variable_scope("stage3", use_resource=True):
 54        with variable_scope.variable_scope("conv", use_resource=True):
 55            x = layers.Conv2D(3, 1)(x)
 56            return x, labels
 57
 58
 59def stage4(x, labels):
 60    with variable_scope.variable_scope("stage3", use_resource=True):
 61        with variable_scope.variable_scope("flatten", use_resource=True):
 62            x = layers.Flatten()(x)
 63        with variable_scope.variable_scope("dense", use_resource=True):
 64            logits = layers.Dense(10)(x)
 65        with variable_scope.variable_scope("entropy", use_resource=True):
 66            cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
 67                labels=labels, logits=logits
 68            )
 69        with variable_scope.variable_scope("loss", use_resource=True):
 70            loss = tf.reduce_mean(cross_entropy)
 71        return loss
 72
 73
 74def optimizer_function(loss):
 75    optimizer = tf.train.GradientDescentOptimizer(0.01)
 76    return pipelining_ops.OptimizerFunctionOutput(optimizer, loss)
 77
 78
 79def my_net():
 80    pipeline_op = pipelining_ops.pipeline(
 81        computational_stages=[stage1, stage2, stage3, stage4],
 82        gradient_accumulation_count=8,
 83        repeat_count=2,
 84        inputs=[],
 85        infeed_queue=infeed_queue,
 86        outfeed_queue=outfeed_queue,
 87        optimizer_function=optimizer_function,
 88        name="Pipeline",
 89    )
 90    return pipeline_op
 91
 92
 93with ops.device("/device:IPU:0"):
 94    r = ipu_compiler.compile(my_net, inputs=[])
 95
 96dequeue_op = outfeed_queue.dequeue()
 97
 98cfg = config.IPUConfig()
 99cfg.allow_recompute = True
100cfg.selection_order = config.SelectionOrder.ZIGZAG
101cfg.auto_select_ipus = 4
102cfg.configure_ipu_system()
103utils.move_variable_initialization_to_cpu()
104
105with tf.Session() as sess:
106    sess.run(variables.global_variables_initializer())
107    sess.run(infeed_queue.initializer)
108    sess.run(r)
109    losses = sess.run(dequeue_op)

Here, tf.train.GradientDescentOptimizer() automatically adds a stage to the pipeline for gradient computation, and a stage (gradientDescent) for weight update. Note that gradient_accumulation_count=8 means that gradientDescent is computed once every eight mini-batches of data. And repeat_count=2 means that the pipeline computes twice the gradientDescent; that is, the weight parameters are updated twice.

You can profile the program by running it with the following environment variable POPLAR_ENGINE_OPTIONS='{"autoReport.all":"true", "autoReport.directory":"/destination/path/"}', and then open the generated report with PopVision Graph Analyser to get the execution information as shown in Fig. 3.6. See also Section 4, PopVision™ Graph Analyser tool for further information.

_images/training_pipeline_profile.png

Fig. 3.6 Training pipeline profile

We can see from Fig. 3.6 that:

  • The pipeline is repeated twice.

  • Each repetition of the pipeline computes eight mini-batches of data.

  • Each mini-batch of data goes through the phases of forward pass and gradient computation (with optional recomputation).

  • Four stages are executed in parallel on four IPUs.

  • After eight gradient computations, a gradient descent will be executed, that is, the weight will be updated once for the batch.

Listing 3.6 shows an example program that uses pipelining for training with tf.keras.Model in TensorFlow 2 (download source code).

Listing 3.6 Pipelining for training example (with tf.keras.Model)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.data.ops.dataset_ops import Dataset
 4from tensorflow.python.ipu import utils
 5from tensorflow.keras import ipu
 6from tensorflow.keras import layers
 7from tensorflow.keras import optimizers
 8from tensorflow.python.ipu import config
 9from tensorflow.python.ipu import ipu_strategy
10import numpy as np
11import tensorflow as tf
12
13# default data_format is 'channels_last'
14dataset = Dataset.from_tensor_slices(
15    (
16        tf.random.uniform([2, 128, 128, 3], dtype=tf.float32),
17        tf.random.uniform([2], maxval=10, dtype=tf.int32),
18    )
19)
20dataset = dataset.batch(batch_size=2, drop_remainder=True)
21dataset = dataset.shuffle(1000)
22dataset = dataset.cache()
23dataset = dataset.repeat()
24dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
25
26
27# Create a pipelined model which is split across four stages.
28def my_model():
29    input_layer = layers.Input(shape=(128, 128, 3), dtype=tf.float32, batch_size=2)
30
31    with ipu.PipelineStage(0):
32        x = layers.Conv2D(3, 1)(input_layer)
33
34    with ipu.PipelineStage(1):
35        x = layers.Conv2D(3, 1)(x)
36
37    with ipu.PipelineStage(2):
38        x = layers.Conv2D(3, 1)(x)
39
40    with ipu.PipelineStage(3):
41        x = layers.Flatten()(x)
42        logits = layers.Dense(10)(x)
43
44    return tf.keras.Model(input_layer, logits)
45
46
47cfg = config.IPUConfig()
48cfg.allow_recompute = True
49cfg.selection_order = config.SelectionOrder.ZIGZAG
50cfg.auto_select_ipus = 4
51cfg.configure_ipu_system()
52utils.move_variable_initialization_to_cpu()
53
54
55# Define the model under an IPU strategy scope
56strategy = ipu_strategy.IPUStrategy()
57with strategy.scope():
58    model = my_model()
59    model.set_pipelining_options(gradient_accumulation_steps_per_replica=8)
60
61    model.compile(
62        steps_per_execution=128,
63        loss="sparse_categorical_crossentropy",
64        optimizer=optimizers.SGD(0.01),
65    )
66
67    model.fit(dataset, steps_per_epoch=128)

Similar to Listing 3.6, Listing 3.7 uses tf.keras.Sequential to peform training with a pipelined model (download source code)

Listing 3.7 Example of using pipelining for training (with tf.keras.Sequential)
 1# Copyright (c) 2022 Graphcore Ltd. All rights reserved.
 2
 3from tensorflow.python.data.ops.dataset_ops import Dataset
 4from tensorflow.python.ipu import utils
 5from tensorflow.keras import layers
 6from tensorflow.keras import optimizers
 7from tensorflow.python.ipu import config
 8from tensorflow.python.ipu import ipu_strategy
 9import numpy as np
10import tensorflow as tf
11
12# default data_format is 'channels_last'
13dataset = Dataset.from_tensor_slices(
14    (
15        tf.random.uniform([2, 128, 128, 3], dtype=tf.float32),
16        tf.random.uniform([2], maxval=10, dtype=tf.int32),
17    )
18)
19dataset = dataset.batch(batch_size=2, drop_remainder=True)
20dataset = dataset.shuffle(1000)
21dataset = dataset.cache()
22dataset = dataset.repeat()
23dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
24
25
26# Create a pipelined model which is split across four stages.
27def my_model():
28    return tf.keras.Sequential(
29        [
30            layers.Conv2D(3, 1),
31            layers.Conv2D(3, 1),
32            layers.Conv2D(3, 1),
33            layers.Flatten(),
34            layers.Dense(10),
35        ]
36    )
37
38
39cfg = config.IPUConfig()
40cfg.allow_recompute = True
41cfg.selection_order = config.SelectionOrder.ZIGZAG
42cfg.auto_select_ipus = 4
43cfg.configure_ipu_system()
44utils.move_variable_initialization_to_cpu()
45
46
47# Define the model under an IPU strategy scope
48strategy = ipu_strategy.IPUStrategy()
49with strategy.scope():
50    model = my_model()
51
52    model.set_pipelining_options(gradient_accumulation_steps_per_replica=8)
53    model.set_pipeline_stage_assignment([0, 1, 2, 3, 3])
54
55    model.compile(
56        steps_per_execution=128,
57        loss="sparse_categorical_crossentropy",
58        optimizer=optimizers.SGD(0.01),
59    )
60
61    model.fit(dataset, steps_per_epoch=128)

3.5. Optimising the pipeline

3.5.1. Recomputation

The Poplar SDK makes more efficient use of the valuable In-Processor-Memory by saving selected activation inputs, optimising on memory savings vs TFLOP expenditure with recomputation. The two figures below demonstrate this, showing how the subset of activation inputs that are saved can be used to recompute all the necessary activation history for the backward pass calculation of the weight updates, thus saving on memory usage. To enable recomputation, use the allow_recompute attribute of an instance of the IPUConfig class when configuring the device.

_images/comp_flow.png

Fig. 3.7 Normal computation flow

_images/comp_flow_recomp_enabled.png

Fig. 3.8 Computation flow after recomputation enabled

3.5.2. Variable offloading

When using pipelining to train a model, it is possible to offload certain variables into Streaming Memory. This feature can allow savings of In-Processor-Memory memory, at the cost of time spent communicating with the host when the offloaded variables are needed on the device. The API supports offloading of the weight update variables and activations.

The weight update variables are any tf.Variable only accessed and modified during the weight update of the pipeline. An example is the accumulator variable of the tf.MomentumOptimizer. This means that these variables do not need to be stored in the device memory during the forward and backward propagation of the model, so when offload_weight_update_variables is enabled they are streamed onto the device during the weight update and then streamed back to Streaming Memory after they have been updated.

When offload_activations is enabled, all the activations for the mini-batches which are not being executed by the pipeline stages at any given time are stored in the Streaming Memory. So in an analogous way as described above, when an activation is needed for computation it is streamed onto the device, and then streamed back to the Streaming Memory after it has been used.

For more information on variable offloading, see the TensorFlow API documentation: Optimizer state offloading.

3.5.3. Device selection order

Use the API to make sure the pipeline stage mapping to devices utilises the IPU-Links as much as possible.

3.5.4. Data parallelism

Pipelining supports replicated graphs. When using the pipeline operator, use the tensorflow.python.ipu.optimizers.CrossReplicaOptimizer in the optimiser function. When using the IPU Keras PipelineModel and PipelineSequential classes in an IPUStrategy, replication is handled automatically whenever the model is placed on a multi-IPU device and so the CrossReplicaOptimizer must not be used.

If the model you are working on is defined as using a mini-batch size B and the gradient accumulation count is G and the replication factor is R, this results in an effective batch size of B x G x R.

Note that the all-reduce collectives for the gradients are only performed during the weight update.

3.5.5. Increase the gradient accumulation count

The bigger the gradient accumulation count:

  • The smaller the proportion of time spent during a weight update.

  • The smaller the proportion of time spent during ramp up and ramp down.

An increase in gradient accumulation count yields these reductions by performing the forward and backward passes on a greater number of mini-batches prior to the updating of weights and/or parameters, resulting in a greater effective batch size. The computed gradients for each mini-batch are aggregated such that the weight update is performed with the larger, aggregated batch.

In Fig. 3.9, the processing of four mini-batches is shown without gradient accumulation. It can be seen that following the forward and backward passes of each mini-batch is a weight update stage.

_images/pipeline_no_grad_accumulation.png

Fig. 3.9 Not using gradient accumulation

However, when processing the four-mini batches of Fig. 3.9 with a gradient accumulation count of four, it can be seen in Fig. 3.10 that only a single weight update stage is performed. This is due to the aggregation of the gradients computed in the backward pass for each mini-batch.

_images/pipeline_grad_accumulation_count_4.png

Fig. 3.10 Using a gradient accumulation count of 4

Weight updates are performed following the ramp down phase of pipeline execution, and so the use of a higher gradient accumulation count will also reduce the number of ramp up and ramp down cycles between batches. This is because the effective batch size will be larger, as previously outlined. Because ramp up and ramp down fill and clear the pipeline, reducing the number of ramp up and ramp down cycles maximises time spent on compute. A lower gradient accumulation count will incur more ramp up and ramp down cycles, causing more time to be spent filling and clearing the pipeline.

3.5.6. Profiling

When your model is executing correctly, you can try moving layers around, or if the model doesn’t fit in one or more IPUs you can try changing the available memory proportion for temporary memory usage (for more information, see the technical note on this option).

  • Move layers towards the final computation stage to reduce the amount of recomputation

  • Adjust availableMemoryProportion. For example:

# Set "availableMemoryProportion" flag to "0.5"
cfg = ipu.config.IPUConfig()
cfg.convolutions.poplar_options["availableMemoryProportion"] = "0.5"
cfg.matmuls.poplar_options["availableMemoryProportion"] = "0.5"
cfg.configure_ipu_system()
  • More fine-grained control of the available memory proportion with the following options:

    • forward_propagation_stages_poplar_options: If provided, a list of length equal to the number of computational stages. Each element is a PipelineStageOptions object which allows for fine grain control of the Poplar options for a given forward propagation computational stage.

    • backward_propagation_stages_poplar_options: If provided, a list of length equal to the number of computational stages. Each element is a PipelineStageOptions object which allows for fine grained control of the Poplar options for a given backward propagation computational stage.

    • weight_update_poplar_options: If provided, a PipelineStageOptions object which allows for fine grained control of the Poplar options for the weight update stage.

    These can be useful in certain situations, for example if one stage is almost out of memory then the available memory proportion can be lowered there but not for the rest of the model.

  • Make sure that the tf.Dataset passed to the pipeline is not the bottleneck. See the Dataset benchmarking section in Targeting the IPU from TensorFlow for more information.

  • Experiment with Poplar engine options. For example:

POPLAR_ENGINE_OPTIONS='{"opt.enableSwSyncs": ”true"}'