6. Training a model
TensorFlow XLA and Poplar provide the ability to combine an entire training graph into a single operation in the TensorFlow graph. This accelerates training by removing the need to make calls to the IPU hardware for each operation in the graph.
However, if the Python code with the training pass is called multiple times, once for each batch in the training data set, then there is still the overhead of calling the hardware for each batch.
The Graphcore IPU support for TensorFlow provides three mechanisms to improve the training performance: training loops, data set feeds, and replicated graphs.
6.1. Training loops, data sets and feed queues
By placing the training operations inside a loop, they can be executed multiple
times without returning control to the host. It is possible to use a standard
TensorFlow while_loop
operation to wrap the training operation, but the IPU
library provides a convenient and feature rich version.
Normally when TensorFlow runs, operations which are not inside a loop will be executed once, and those operations will return one or more tensors with fixed values. However, when a training operation is placed into a loop, the inputs to that training operation need to provide a stream of values. Standard TensorFlow Python feed dictionaries cannot provide data in this form, so when training in a loop, data must be fed from a TensorFlow DataSet.
You can find more information about the DataSet class and its use in normal operation on the TensorFlow Better performance with the tf.data API web page. TensorFlow provides many pre-configured DataSets for use in training models, see TensorFlow Datasets for more information.
To construct a system that will train in a loop, you will need to do the following:
Wrap your optimiser training operation in a loop.
Create an
IPUInfeedQueue
to feed data to that loop.Create an
IPUOutfeedQueue
to take results out of that loop.Create a TensorFlow DataSet to provide data to the input queue.
The following example shows how to construct a trivial DataSet, attach it to
a model using in IPUInfeedQueue
, feed results into an IPUOutfeedQueue
, and
construct a loop.
1from tensorflow.python.ipu import ipu_infeed_queue
2from tensorflow.python.ipu import ipu_outfeed_queue
3from tensorflow.python.ipu import loops
4from tensorflow.python.ipu import ipu_strategy
5from tensorflow.python.ipu.config import IPUConfig
6import tensorflow as tf
7
8# The dataset for feeding the graphs
9ds = tf.data.Dataset.from_tensors(tf.constant(1.0, shape=[800]))
10ds = ds.map(lambda x: [x, x])
11ds = ds.repeat()
12
13# The host side queues
14infeed_queue = ipu_infeed_queue.IPUInfeedQueue(ds)
15outfeed_queue = ipu_outfeed_queue.IPUOutfeedQueue()
16
17
18# The device side main
19def body(counter, x1, x2):
20 d1 = x1 + x2
21 d2 = x1 - x2
22 counter += 1
23 outfeed_queue.enqueue({'d1': d1, 'd2': d2})
24 return counter
25
26
27@tf.function(jit_compile=True)
28def my_net():
29 count = 0
30 count = loops.repeat(10, body, [count], infeed_queue)
31 return count
32
33
34# Configure the hardware.
35config = IPUConfig()
36config.auto_select_ipus = 1
37config.configure_ipu_system()
38
39# Initialize the IPU default strategy.
40strategy = ipu_strategy.IPUStrategyV1()
41
42with strategy.scope():
43 infeed_queue.initializer
44 count_out = strategy.run(my_net)
45 print("counter", count_out)
46
47 # The outfeed dequeue has to happen after the outfeed enqueue op has been executed.
48 result = outfeed_queue.dequeue()
49
50 print("outfeed result", result)
Download perf_training_example.py
In this case the DataSet is a trivial one. The program constructs it from a single TensorFlow constant, and then maps its output into a pair of tensors. It then arranges for the DataSet to be repeated indefinitely.
After the DataSet is created, the two data feed queues are constructed. The
IPUInfeedQueue
takes the DataSet as a parameter.
The IPUOutfeedQueue
has extra options to control how it collects and outputs
the data sent to it. None of these are used in this example.
Now that we have the DataSet and the queues for getting data in and out of the
device-side code, we can construct the device-side part of the model. In this
example, the body
function constructs a very simple model, which does not
even have an optimiser. It takes the two data samples which will be provided
by the DataSet, and performs some simple maths on them, and inserts the
results into the output queue.
Typically, in this function, the full ML model would be constructed and a
TensorFlow Optimizer
would be used to generate a backward pass and variable
update operations. The returned data would typically be a loss value, or
perhaps nothing at all if all we do is call the training operation.
The my_net
function is where the loops.repeat
function is called. This
wraps the body
function in a loop. It takes as the first parameter the
number of times to execute the operation, in this case 10. It also takes the
function that generated the body of the loop, in this case the function
body
, a list of extra parameters to pass to the body, in this case none,
and finally the infeed queue which will feed data into the loop.
Next we create a strategy scope at the top level and call strategy.run
.
This takes the my_net
function as an argument and creates the training loop
in the main graph.
Finally, we create an operation which can be used to fetch results from the outfeed queue. Note that it isn’t necessary to use an outfeed queue if you do not wish to receive any per-sample output from the training loop. If all you require is the final value of a tensor, then it can be output normally without the need for a queue.
If you run this example then you will find that the result is a Python
dictionary containing two numpy arrays. The first is the d1
array and
will contain x1 + x2
for each iteration in the loop. The second is the d2
array and will contain x1 - x2
for each iteration in the loop.
See entries in the TensorFlow Python API for more details.
For a more practical example, the Graphcore tutorials repository contains a detailed tutorial about using infeeds and outfeeds with TensorFlow.
6.2. Optional simplification of infeeds and outfeeds
You do not need to manually create the IPUInfeedQueue
.
Instead, provided you are inside an IPUStrategy
, you can simply wrap your Dataset
in a Python iterator
, like so:
dataset = create_dataset()
iterator = iter(dataset)
This will then allow you to use next()
within the on-device training loop to
retrieve the next item of data:
def model_fn(...):
for _ in tf.range(steps_per_execution):
# Get the next input.
features, labels = next(iterator)
Outfeeds are iterable objects. This means that you can use a for
loop to
dequeue elements, or use built-ins like sum()
to aggregate the outputs.
1strategy = ipu.ipu_strategy.IPUStrategy()
2with strategy.scope():
3 # Create a Keras model.
4 train_model = create_model()
5
6 # Create an optimizer.
7 opt = tf.keras.optimizers.SGD(0.01)
8
9 # Create an iterator for the dataset.
10 train_iterator = iter(create_dataset())
11
12 # Create an IPUOutfeedQueue to collect results from each sample.
13 outfeed_queue = ipu.ipu_outfeed_queue.IPUOutfeedQueue()
14
15 # Total number of steps (batches) to run.
16 total_steps = 1000
17
18 # How many steps (batches) to execute each time the device executes.
19 steps_per_execution = 10
20
21 for begin_step in range(0, total_steps, steps_per_execution):
22 # Run the training loop.
23 strategy.run(training_loop,
24 args=(train_iterator, steps_per_execution, outfeed_queue,
25 train_model, opt))
26 # Calculate the mean loss.
27 total = 0.
28 for training_loss in outfeed_queue:
29 total += training_loss
30 print(f"Loss: {total / steps_per_execution}")
6.3. Accessing outfeed queue results during execution
An IPUOutfeedQueue
allows a program to fetch results continuously during
the execution of a model. You can use this feature to monitor the performance of
the network, for example to check that the loss is decreasing, or to stream
predictions for an inference model to achieve minimal latency for each sample.
1from threading import Thread
2
3from tensorflow.python import ipu
4import tensorflow as tf
5from tensorflow.python.keras.datasets import mnist
6
7NUM_ITERATIONS = 100
8
9#
10# Configure the IPU system
11#
12cfg = ipu.config.IPUConfig()
13cfg.auto_select_ipus = 1
14cfg.configure_ipu_system()
15
16
17#
18# The input data and labels
19#
20def create_dataset():
21 (x_train, y_train), (_, _) = mnist.load_data()
22 x_train = x_train / 255.0
23
24 train_ds = tf.data.Dataset.from_tensor_slices(
25 (x_train, y_train)).shuffle(10000)
26 train_ds = train_ds.map(lambda d, l:
27 (tf.cast(d, tf.float32), tf.cast(l, tf.int32)))
28 train_ds = train_ds.batch(32, drop_remainder=True)
29 return train_ds.repeat()
30
31
32#
33# The host side queue
34#
35outfeed_queue = ipu.ipu_outfeed_queue.IPUOutfeedQueue()
36
37
38#
39# A custom training loop
40#
41@tf.function(jit_compile=True)
42def training_step(num_iterations, iterator, in_model, optimizer):
43
44 for _ in tf.range(num_iterations):
45 features, labels = next(iterator)
46 with tf.GradientTape() as tape:
47 predictions = in_model(features, training=True)
48 prediction_loss = tf.keras.losses.sparse_categorical_crossentropy(
49 labels, predictions)
50 loss = tf.reduce_mean(prediction_loss)
51 grads = tape.gradient(loss, in_model.trainable_variables)
52 optimizer.apply_gradients(zip(grads, in_model.trainable_variables))
53
54 outfeed_queue.enqueue(loss)
55
56
57#
58# Execute the graph
59#
60strategy = ipu.ipu_strategy.IPUStrategyV1()
61with strategy.scope():
62 # Create the Keras model and optimizer.
63 model = tf.keras.models.Sequential([
64 tf.keras.layers.Flatten(),
65 tf.keras.layers.Dense(128, activation='relu'),
66 tf.keras.layers.Dense(10, activation='softmax')
67 ])
68 opt = tf.keras.optimizers.SGD(0.01)
69
70 # Create an iterator for the dataset.
71 train_iterator = iter(create_dataset())
72
73 # Function to continuously dequeue the outfeed until NUM_ITERATIONS examples
74 # are seen.
75 def dequeue_thread_fn():
76 counter = 0
77 while counter != NUM_ITERATIONS:
78 for loss in outfeed_queue:
79 print("Step", counter, "loss = ", loss.numpy())
80 counter += 1
81
82 # Start the dequeuing thread.
83 dequeue_thread = Thread(target=dequeue_thread_fn, args=[])
84 dequeue_thread.start()
85
86 # Run the custom training loop over the dataset.
87 strategy.run(training_step,
88 args=[NUM_ITERATIONS, train_iterator, model, opt])
89 dequeue_thread.join()
Download outfeed_example.py
6.4. Replicated graphs
To improve performance, multiple IPUs can be configured to run in a data parallel mode. The graph is said to be replicated across multiple IPUs. See the Poplar and PopLibs User Guide for more background about replicated graphs.
6.4.1. Selecting the number of replicas
During system configuration, you specify the number of IPUs for the
TensorFlow device using the auto_select_ipus
or select_ipus
options on
an IPUConfig
instance.
A graph can be sharded across multiple IPUs (model parallelism), and then replicated across IPUs (data parallelism). When specifying the number of IPUs in the system, you must specify a multiple of the number of shards used by the graph.
For instance, if a graph is sharded over two IPUs, and you set the
auto_select_ipus
option to eight IPUs, then the graph will be replicated
four times.
6.4.2. Performing parameter updates
Each replica maintains its own copy of the graph, but during training it is important to ensure that the graph parameters are updated so that they are in sync across replicas.
A wrapper for standard TensorFlow optimisers is used to add extra operations to
the parameter update nodes in the graph to average updates across replicas.
See tensorflow.python.ipu.optimizers.CrossReplicaOptimizer
.
6.5. Pipelined training
The IPU pipeline API creates a series of computational stages, where the outputs of one stage are the inputs to the next one. These stages are then executed in parallel across multiple IPUs. This approach can be used to split the model where layer(s) are executed on different IPUs.
This improves utilisation of the hardware when a model is too large to fit into a single IPU and must be sharded across multiple IPUs.
Each of the stages is a set of operations, and is described using a Python
function, in much the same way as the ipu.compile
takes a function that
describes the graph to compile onto the IPU.
See tensorflow.python.ipu.pipelining_ops.pipeline()
for details of the
pipeline operator.
The pipeline API requires data inputs to be provided by a tf.DataSet
source connected via an infeed operation. If you would like per-sample
output, for instance the loss, then this will have to be provided by an outfeed
operation.
The computational stages can be interleaved on the devices in three different
ways as described by the pipeline_schedule
parameter. By default the API
will use the PipelineSchedule.Grouped
mode (Fig. 6.1), where the forward passes are
grouped together, and the backward passes are grouped together. The main
alternative is the PipelineSchedule.Interleaved
mode (Fig. 6.2), where the forward and
backward passes are interleaved, so that fewer activations need to be stored. Additionally, the PipelineSchedule.Sequential
mode (Fig. 6.3),
where the pipeline is scheduled in the same way as if it were a sharded model,
may be useful when debugging your model.
For more information on pipelining, refer to the technical note on Model parallelism with TensorFlow: sharding and pipelining.
6.5.1. Grouped scheduling
6.5.2. Interleaved scheduling
6.5.3. Sequential scheduling
6.5.4. Pipeline stage inputs and outputs
The first pipeline stage needs to have inputs which are a combination of the tensors from the DataSet, and the tensors given as arguments to the pipeline operation. Any data which changes for every sample or minibatch of the input should be included in the DataSet, while data which can vary only on each run of the pipeline should be passed as arguments to the pipeline operation. Parameters like the learning rate would fit into this latter case.
Every subsequent pipeline stage must have its inputs as the outputs of the previous stage. Note that things like the learning rate must be threaded through each pipeline stage until they are used.
6.5.5. Applying an optimiser to the graph
The optimiser must be applied by creating it in a special optimiser function
and then returning a handle to it from that function. The function is passed
into the optimizer_function
argument of the pipeline operation.
When a pipeline is running it will accumulate the gradients from each step of
the pipeline and only apply the updates to the graph parameters at the end of
each pipeline run, given by the gradient_accumulation_count
parameter.
Consequently it is important for the system to have more knowledge of the
optimiser and so it must be given to the pipeline operator using this function.
6.5.6. Device mapping
By default the pipeline operation will map the pipeline stages onto IPUs in
order to minimise the inter-IPU communication lengths. If you need to
override this order, then you can use the device_mapping
parameter.
6.5.7. Concurrent pipeline stages
When pipelining a model, it’s possible to have stages that have no data dependencies and don’t share weights. These stages can benefit from operating on the same mini-batch concurrently.
These concurrent pipeline stages are defined by providing a list of stages. The corresponding element of the device-mapping should also be a list. The argument list to each concurrent stage must be the same, including any arguments coming from an infeed. The input to the next stage, or outfeed, is the concatenation of concurrent stage outputs.
1def stage1a(args...):
2 # ... do stuff on IPU 1
3 return a0, a1, ...
4def stage1b(args...):
5 # ... do stuff on IPU 2
6 return b0, b1, ...
7
8# stage 2 arguments are a concatenation of the results of the previous
9# concurrent stages.
10def stage2(a0, a1, ..., b0, b1, ...):
11 # ... do stuff on IPU 2
12
13pipeline_op = pipelining_ops.pipeline(
14 # Second element of the list is also a list of functions.
15 computational_stages=[stage0, [stage1a, stage1b], stage2, stage3],
16 # ...
17 # Second element of the list is also a list of device IDs.
18 device_mapping=[0, [1, 2], 2, 3])
Comparing this to a pipeline not using this feature where the activations are passed through stages, the pipeline is shorter. This means that fewer activations are stored over the whole execution of the pipeline and the expected latency is lower than the serialised pipeline.
6.6. Recomputation
As outlined in the technical note Model Parallelism on the IPU with TensorFlow: Sharding and Pipelining, the recomputation of activations can be enabled for the backward passes. Instead of storing every activation in a FIFO stash on the devices, only a subset are kept while the rest are recomputed when they are required. This can lead to a reduction in activation memory liveness, at the expense of extra computation cycles. It can be a useful optimisation of memory usage, and can be applied to pipelined multi-IPU and non-pipelined single-IPU models.
To enable recomputation, use the
allow_recompute
attribute of
an instance of the
IPUConfig
class when configuring the
device.
Recomputation only applies to stateless activations. Stateful activations (such as Dropout) will always be stashed.
With pipelining, each computational stage is an implicit recomputation point. For non-pipelined models, recomputation checkpoints need to be added. Checkpoints can be set in multiple ways:
A tensor in the graph can be wrapped in a
recomputation_checkpoint()
op.In Keras, the
RecomputationCheckpoint
layer can be used.
When using explicit recomputation checkpoints, the Grouped
pipeline
schedule (Fig. 6.1) must be used for multi-IPU models. For non-pipelined models the
Sequential
schedule (Fig. 6.3) must be used.
For more information on recomputation and recomputation checkpoints, refer to the following:
The Recomputation section in the IPU Programmer’s Guide.
The Activation recomputations section in the Memory and Performance Optimisation on the IPU.
The feature example on Using Recomputation Checkpoints for a code example.
6.7. Gradient accumulation
Gradient accumulation is a technique for increasing the effective batch size by processing multiple batches of training examples before updating the model. The gradients from each batch are accumulated and these accumulated gradients are used to compute the weight update. When gradient accumulation is used, the effective batch size of the model is the number of mini-batches for which the gradients are accumulated multiplied by the mini-batch size.
Gradient accumulation is a useful optimisation technique for replicated graphs as it reduces the number of times the gradients are exchanged between replicas by a factor of the number of mini-batches. This is because the gradients only need to be exchanged between replicas when the weight update is computed. When gradient accumulation is used with replication, the effective batch size of the model is the number of mini-batches for which the gradients are accumulated multiplied by the mini-batch size multiplied by the replication factor.
The choice of gradient accumulation count impacts the time spent in the ramp-up and ramp-down stages of pipeline execution. As the gradient accumulation count increases, the proportion of cycles spent on the ramp-up/ramp-down phases decreases, with respect to the total number of cycles required to process the batch. As weight updates are performed after the ramp-down phase, a higher gradient accumulation count also results in a smaller proportion of cycles spent on weight updates.
There are multiple convenient ways to use gradient accumulation with minimal modifications to your model.
6.7.1. Optimizers
Gradient accumulation optimizers provide an easy way to add gradient accumulation to your model:
GradientAccumulationOptimizerV2
is a general purpose optimizer which can be used to wrap any other TensorFlow optimizer. It supports optimizer state offloading (see the Optimizer state offloading section).GradientAccumulationOptimizer
is an optimizer which can be used to wraptf.train.GradientDescentOptimizer
andtf.train.MomentumOptimizer
only. Note that this optimizer does not support optimizer state offloading.
The cross-replica versions of these optimizers can be used with replicated
graphs, see
CrossReplicaGradientAccumulationOptimizerV2
and
CrossReplicaGradientAccumulationOptimizer
.
Note
These optimizers need to be used inside of a training loop generated
by repeat()
.
6.7.2. Pipelining
All pipelined training graphs automatically apply gradient accumulation to the
model such that the weight update is only computed once all the mini-batches have
gone through the whole model pipeline, where the number of mini-batches is the
gradient_accumulation_count
.
Note
Since the pipelined models always implement gradient accumulation, no gradient accumulation optimizer should be used in combination with pipelining.
6.7.3. Accumulation data type
When accumulating gradients over a large number of mini-batches, it can be
beneficial to perform the accumulation in a data type with higher precision
(and dynamic range) than that of the gradients. By default, the accumulation is
performed using the same data type as the corresponding variable, but this can
be overridden by passing gradient_accumulation_dtype
(or just dtype
to
the GradientAccumulationOptimizerV2
).
This argument can be either of these three options:
None
: Use an accumulator of the same type as the variable type.A
DType
: Use this type for all the accumulators. For exampletf.float32
.A callable that takes the variable and returns a
DType
: Allows specifying the accumulator type on a per-variable basis. For example, passinglambda var: var.dtype
would have the same effect as passingNone
.
Note that when accumulating the gradients using a different data type than that of the variable, an standard optimizer will not work since there will be a data type mismatch between the accumualated gradient and the variable when doing the weight update. You can use a custom optimizer to cast the final accumulated gradient to the data type of the variable before performing the weight update, for example like this with a Keras SGD optimizer:
class CastGradientsSGD(tf.keras.optimizers.SGD):
def apply_gradients(self, grads_and_vars, name=None):
cast_grads_and_vars = [(tf.cast(g, v.dtype), v)
for (g, v) in grads_and_vars]
return super().apply_gradients(cast_grads_and_vars, name)
6.8. Optimizer state offloading
Some optimizers have an optimizer state which is only accessed and modified
during the weight update. For example the tf.MomentumOptimizer
optimizer has
accumulator variables which are only accessed and modified during the weight
update.
This means that when gradient accumulation is used, whether through the use of
pipelining or the
GradientAccumulationOptimizerV2
optimizer, the optimizer state variables do not need to be stored in the device
memory during the forward and backward propagation of the model. These variables
are only required during the weight update and so they are streamed onto the
device during the weight update and then streamed back to remote memory after
they have been updated.
This feature is enabled by default for both pipelining and when
GradientAccumulationOptimizerV2
is used.
It can be disabled by setting the offload_weight_update_variables
argument
of pipeline()
or
GradientAccumulationOptimizerV2
to False
. In Keras, this is done as a keyword argument to
set_pipelining_options()
and
set_gradient_accumulation_options()
respectively.
This feature requires the machine to be configured with support for Poplar remote buffers and if the machine does not support it, it is disabled.
Offloading variables into remote memory can reduce maximum memory liveness, but it can also increase the computation time of the weight update as more time is spent communicating with the host.
6.9. Replicated tensor sharding
Replicated tensor sharding (RTS) attempts to reduce memory usage in replicated models by partitioning applicable tensors and distributing them across replicas. RTS identifies tensors that share the same value across replicas and partitions each tensor into evenly sized shards such that each replica stores one shard. If an operation requires the full tensor, the shards can be broadcast to all replicas.
RTS is used to save memory when using stateful optimizers, such as Adam or LAMB,
with replicas. In Keras, it can be enabled by setting the
replicated_optimizer_state_sharding
argument to True in the
set_gradient_accumulation_options()
method for non-pipelined models and the
set_pipelining_options()
method
for pipelined models.
6.10. Dataset benchmarking
In order to fully utilise the potential of the IPU, the tf.data.Dataset
used
by the IPUInfeedQueue
needs to be optimised so that the IPU is not constantly
waiting for more data to become available.
To benchmark your tf.data.Dataset
, you can make use of the
ipu.dataset_benchmark
tool.
See tensorflow.python.ipu.dataset_benchmark
for details of the
functions that you can use to obtain the maximum throughput of your
tf.data.Dataset
.
If the throughput of your tf.data.Dataset
is the bottleneck, you can try to
optimise it using the information on the TensorFlow website:
6.10.1. Accessing the JSON data
The functions in ipu.dataset_benchmark
return a JSON string which can
be loaded into a JSON object using the native JSON library, for example:
import tensorflow as tf
from tensorflow.python import ipu
import json
# Create your tf.data.Dataset
dataset = ...
benchmark_op = ipu.dataset_benchmark.dataset_benchmark(dataset, 10, 512)
with tf.Session() as sess:
json_string = sess.run(benchmark_op)
json_object = json.loads(json_string[0])