4. Keras with IPUs

The Graphcore implementation of TensorFlow includes Keras support for IPUs. Keras model creation is no different than what you would use if you were training on other devices. To target the Poplar XLA device, Keras model creation must be inside the strategy.scope of an IPUStrategy.

For a more practical walkthrough, see this tutorial about using Keras on the IPU from the Graphcore tutorials repository.

4.1. Single IPU models

You can train, evaluate or run inference on single-IPU models through the Keras APIs as you would with other accelerators, as long as you create the model inside the scope of an IPUStrategy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import tensorflow as tf
from tensorflow.python import ipu

# Configure the IPU device.
config = ipu.config.IPUConfig()
config.auto_select_ipus = 1
config.configure_ipu_system()


# Create a simple model.
def create_model():
  return tf.keras.Sequential([
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(256, activation='relu'),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])


# Create a dataset for the model.
def create_dataset():
  mnist = tf.keras.datasets.mnist

  (x_train, y_train), (_, _) = mnist.load_data()
  x_train = x_train / 255.0

  train_ds = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(10000).batch(32, drop_remainder=True)
  train_ds = train_ds.map(lambda d, l:
                          (tf.cast(d, tf.float32), tf.cast(l, tf.int32)))

  return train_ds.repeat().prefetch(16)


dataset = create_dataset()

# Create a strategy for execution on the IPU.
strategy = ipu.ipu_strategy.IPUStrategy()
with strategy.scope():
  # Create a Keras model inside the strategy.
  model = create_model()

  # Compile the model for training.
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      optimizer=tf.keras.optimizers.RMSprop(),
      metrics=["accuracy"],
  )

  model.fit(dataset, epochs=2, steps_per_epoch=100)

4.2. Using steps_per_execution

To reduce Python overhead and maximize the performance of your model, pass the steps_per_execution argument to the compile method. This argument sets the number of batches processed sequentially by one replica in a single execution which can greatly improve performance because any overhead between steps is removed, thus increasing IPU utilization.

Ideally, steps_per_execution is equal to the number of steps your model needs to run per replica in order to complete one epoch. Note that it is not possible to fetch intermediate results when steps_per_execution is specified. Model weights are read on the Python host after all steps are executed on the IPU. If you need to access model weights during an epoch (for example for saving a checkpoint), you must set steps_per_execution accordingly.

Note

In order to achieve best performance, steps_per_execution needs to be set before using fit(), evaluate() and predict(), even if no training is performed.

See the documentation for the compile method for full details.

The example below highlights the usage of steps_per_execution:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import tensorflow as tf
from tensorflow.python import ipu

# Configure the IPU device.
config = ipu.config.IPUConfig()
config.auto_select_ipus = 1
config.configure_ipu_system()


# Create a simple model.
def create_model():
  return tf.keras.Sequential([
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(256, activation='relu'),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])


# Create a dataset for the model.
def create_dataset():
  mnist = tf.keras.datasets.mnist

  (x_train, y_train), (_, _) = mnist.load_data()
  x_train = x_train / 255.0

  train_ds = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(10000).batch(32, drop_remainder=True)
  train_ds = train_ds.map(lambda d, l:
                          (tf.cast(d, tf.float32), tf.cast(l, tf.int32)))

  return train_ds.prefetch(16)


dataset = create_dataset()

# Create a strategy for execution on the IPU.
strategy = ipu.ipu_strategy.IPUStrategy()
with strategy.scope():
  # Create a Keras model inside the strategy.
  model = create_model()

  # Compile the model for training.
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      optimizer=tf.keras.optimizers.RMSprop(),
      metrics=["accuracy"],
      # Anything between 2 and the length of the dataset would work,
      # but the greater `steps_per_execution` the greater the
      # performance gains.
      steps_per_execution=dataset.cardinality(),
  )

  model.fit(dataset, epochs=2)

4.3. Gradient accumulation

When training, gradient accumulation allows us to simulate bigger batch sizes. This is achieved by accumulating the gradients across multiple batches together then performing the weight update.

For example, if we have a model where each step is of batch size 16 and we set gradient_accumulation_steps_per_replica to 4 then this simulates an input batch of size 64.

Gradient accumulation can be easily enabled for Keras models created inside of an IPUStrategy by calling the following methods:

Functional model

set_gradient_accumulation_options()

Sequential model

set_gradient_accumulation_options()

Model subclass

set_gradient_accumulation_options()

See the respective API documentation for more details.

Note

When using data-parallelism, the steps_per_execution value the model was compiled with must be an integer multiple of gradient_accumulation_steps_per_replica. Data parallelism is discussed in Section 4.5, Automatic data parallelism.

Note

Not all operations are compatible with gradient accumulation.

The example below highlights the usage of set_gradient_accumulation_options:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import tensorflow as tf
from tensorflow.python import ipu

# Configure the IPU device.
config = ipu.config.IPUConfig()
config.auto_select_ipus = 1
config.configure_ipu_system()


# Create a simple model.
def create_model():
  return tf.keras.Sequential([
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(256, activation='relu'),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])


# Create a dataset for the model.
def create_dataset():
  mnist = tf.keras.datasets.mnist

  (x_train, y_train), (_, _) = mnist.load_data()
  x_train = x_train / 255.0

  train_ds = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(10000).batch(32, drop_remainder=True)
  train_ds = train_ds.map(lambda d, l:
                          (tf.cast(d, tf.float32), tf.cast(l, tf.int32)))

  return train_ds.prefetch(16)


dataset = create_dataset()

# Create a strategy for execution on the IPU.
strategy = ipu.ipu_strategy.IPUStrategy()
with strategy.scope():
  # Create a Keras model inside the strategy.
  model = create_model()

  # `steps_per_execution` must be divisible by `gradient_accumulation_steps_per_replica`.
  # Say we want to accumulate 10 steps before doing a weight update, then we would end up
  # with the following values.
  gradient_accumulation_steps_per_replica = 10
  number_of_accumulated_steps = dataset.cardinality(
  ) // gradient_accumulation_steps_per_replica

  # In order to get the proper `steps_per_execution` value, we have to multiply
  # `number_of_accumulated_steps` with `gradient_accumulation_steps_per_replica`.
  steps_per_execution = number_of_accumulated_steps * \
                        gradient_accumulation_steps_per_replica

  # Now we need to truncate the dataset so Keras will not try to take more data
  # from the dataset than is available.
  dataset = dataset.take(steps_per_execution)

  # Compile the model for training.
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      optimizer=tf.keras.optimizers.RMSprop(),
      metrics=["accuracy"],
      steps_per_execution=steps_per_execution,
  )

  model.set_gradient_accumulation_options(
      gradient_accumulation_steps_per_replica=10)

  model.fit(dataset, epochs=2)

4.4. Model parallelism

The models described so far occupy a single IPU device, however some models might require the model layers to be split across multiple IPU devices to achieve high compute efficiency.

One method to achieve model parallelism is called pipelining, where the model layers are assigned to pipeline stages. Each pipeline stage can be assigned to a different device and different devices can execute in parallel.

By default, these pipeline stages will be executed using the grouped schedule (Fig. 4.1), where the forward and backward stages are grouped together on each IPU. All IPUs alternate between executing a forward pass and then a backward pass.

Grouped pipeline schedule illustration

Fig. 4.1 Grouped schedule

Two other schedules are available and can be configured as shown in Section 4.4.4, Pipelining options. When using the interleaved schedule (Fig. 4.2) the forward and backward passes are interleaved (which requires less memory but is likely to be slower). The sequential schedule (Fig. 4.3) executes one stage at a time and may be useful when debugging your model.

Interleaved pipeline schedule illustration

Fig. 4.2 Interleaved schedule

Sequential pipeline schedule illustration

Fig. 4.3 Sequential schedule

Note

In Fig. 4.1, Fig. 4.2 and Fig. 4.3, T refers to the number of gradient accumulation steps per replica. See Section 4.4.4, Pipelining options for how to specify this value.

A detailed explanation of pipelining can be found in the technical note on Model parallelism with TensorFlow: sharding and pipelining.

The method to pipeline your model depends on whether your model is a Sequential model, a Functional model, or is subclassed from the Model class.

4.4.1. Sequential model

To enable IPU pipelining for a Sequential model (an instance of tensorflow.keras.Sequential), a list of per-layer pipeline stage assignments should be passed to the set_pipeline_stage_assignment() method of the model.

For example, a simple four layer Sequential model could be assigned to two different pipeline stages as follows:

1
2
3
4
5
6
7
8
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(8),  # Pipeline stage 0.
      tf.keras.layers.Dense(16),  # Pipeline stage 0.
      tf.keras.layers.Dense(16),  # Pipeline stage 1.
      tf.keras.layers.Dense(1),  # Pipeline stage 1.
  ])

  model.set_pipeline_stage_assignment([0, 0, 1, 1])

You can confirm which layers are assigned to which stages using the print_pipeline_stage_assignment_summary() method of the model.

4.4.2. Functional model

There are two ways to enable IPU pipelining for a Functional model (an instance of tensorflow.keras.Model) depending on if you’re pipelining a model you are writing yourself or an existing model.

Pipelining a model you are writing yourself

To pipeline a Functional model you are writing yourself, each layer call must happen within the scope of an ipu.keras.PipelineStage context.

For example, a simple four layer Functional model could be assigned to two different pipeline stages as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  input_layer = tf.keras.layers.Input((28, 28))

  with ipu.keras.PipelineStage(0):
    x = tf.keras.layers.Dense(8)(input_layer)
    x = tf.keras.layers.Dense(16)(x)

  with ipu.keras.PipelineStage(1):
    x = tf.keras.layers.Dense(16)(x)
    x = tf.keras.layers.Dense(1)(x)

  model = tf.keras.Model(inputs=input_layer, outputs=x)

Layers constructed within an ipu.keras.PipelineStage context will have that pipeline stage assigned to all invocations of the layer. These assignments are overridden if the layer calls happen within a different ipu.keras.PipelineStage context.

Pipelining an existing functional model

To pipeline an existing Functional model, you can use get_pipeline_stage_assignment(). Each layer invocation in the model has an associated FunctionalLayerPipelineStageAssignment object, which indicates what pipeline stage that invocation is assigned to. get_pipeline_stage_assignment returns a list of these stage assignments, which you can inspect and modify. Note that the list is in post-order, which means the assignments are returned in the order they will be executed.

Once you are done modifying the stage assignments, you should use set_pipeline_stage_assignment() to set them on the model.

For example, a naive way of pipelining ResNet50 would be to assign everything up until the “conv4_block2_add” layer invocation to the first stage, then everything else to the second stage, as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
strategy = ipu.ipu_strategy.IPUStrategy()
with strategy.scope():

  from tensorflow.keras.applications.resnet50 import ResNet50
  model = ResNet50(weights='imagenet')

  # Get the individual assignments - note that they are returned in post-order.
  assignments = model.get_pipeline_stage_assignment()

  # Iterate over them and set their pipeline stages.
  stage_id = 0
  for assignment in assignments:
    assignment.pipeline_stage = stage_id
    # Split the model on the `conv4_block2_add` layer.
    if assignment.layer.name.startswith("conv4_block2_add"):
      stage_id = 1

  # Set the assignments to the model.
  model.set_pipeline_stage_assignment(assignments)

  model.print_pipeline_stage_assignment_summary()

Note

You can use print_pipeline_stage_assignment_summary() to print the pipeline stage assignments of the model’s layer invocations.

Note

This method of assigning pipeline stages can also be used with Functional models you are writing yourself, as well as with Sequential models and Model subclasses using the SequentialExtension and ModelExtension equivalents.

4.4.3. Model subclass

Model subclasses are subclasses of tf.keras.Model, which override the call method. There are two ways to enable IPU pipelining for an instance of a Model subclass, depending on if you’re pipelining a model you are writing yourself or an existing model. These are very similar to the methods available for Functional models.

Pipelining a model you are writing yourself

To pipeline a Model subclass you are writing yourself, each layer call must happen within the scope of an ipu.keras.PipelineStage context.

For example, a simple four layer Model subclass could be assigned to four different pipeline stages as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MyModel(tf.keras.Model):
  def __init__(self):
    super(MyModel, self).__init__(self)
    self.dense_layer_1 = tf.keras.layers.Dense(8)
    self.dense_layer_2 = tf.keras.layers.Dense(8)
    self.concat_layer = tf.keras.layers.Concatenate()
    self.dense_layer_3 = tf.keras.layers.Dense(1)

  def call(self, inputs):
    # Invoke layers inside PipelineStage scopes to assign the layer invocations
    # to the specified pipeline stage.
    with ipu.keras.PipelineStage(0):
      x = self.dense_layer_1(inputs)
    with ipu.keras.PipelineStage(1):
      x1 = self.dense_layer_2(x)
      x2 = self.dense_layer_2(x)
    with ipu.keras.PipelineStage(2):
      x1 = self.dense_layer_2(x1)
      x2 = self.dense_layer_2(x2)
      x = self.concat_layer([x1, x2])
    with ipu.keras.PipelineStage(3):
      x = self.dense_layer_3(x)

    return x

Layers constructed within an ipu.keras.PipelineStage context will have that pipeline stage assigned to all invocations of the layer. These assignments are overridden if the layer calls happen within a different ipu.keras.PipelineStage context.

Pipelining an existing model

To pipeline an existing Model subclass, you must use get_pipeline_stage_assignment(). Each layer invocation in the model has an associated ModelLayerPipelineStageAssignment object, which indicates what pipeline stage that invocation is assigned to. get_pipeline_stage_assignment returns a list of these stage assignments, which you can inspect and modify. Note that the list is in post-order, which means the assignments are returned in the order they will be executed.

Once you are done modifying the stage assignments, you should use set_pipeline_stage_assignment() to set them on the model.

Before you can get or set pipeline stage assignments, you must first call keras.Model.build() on your model, specifying the input shapes. This traces the model’s call function using the shapes specified. The resulting graph is what will be used for pipelined execution. You can update the graph by calling build again, though this will invalidate existing pipeline stage assignments if the structure of the updated graph is different.

Note

If you need to specify input dtypes when calling keras.Model.build(), you can pass in keras.Input objects instead of plain shapes.

For example, an existing Model subclass with four layers, could be assigned to four different pipeline stages as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  model = ExistingModel()

  # Call build to trace the graph generated by the call function.
  # This step is required before getting or setting pipeline stage assignments.
  model.build((28, 28))

  # Get a blank set of pipeline stage assignments.
  assignments = model.get_pipeline_stage_assignment()

  # Modify the assignments by setting pipline stages.
  for assignment in assignments:
    if assignment.layer == model.dense_layer_1:
      assignment.pipeline_stage = 0
    elif assignment.layer == model.dense_layer_2 and assignment.node_index < 2:
      assignment.pipeline_stage = 1
    elif assignment.layer == model.dense_layer_2 and assignment.node_index < 4:
      assignment.pipeline_stage = 2
    elif assignment.layer == model.concat_layer:
      assignment.pipeline_stage = 2
    elif assignment.layer == model.dense_layer_3:
      assignment.pipeline_stage = 3

  # Apply the modified assignments back to the model.
  model.set_pipeline_stage_assignment(assignments)

Note

You can use print_pipeline_stage_assignment_summary() to print the pipeline stage assignments of the model’s layer invocations.

Note

This method of assigning pipeline stages can also be used with Model subclasses you are writing yourself, as well as with Functional and Sequential models using the SequentialExtension and FunctionalExtension equivalents.

4.4.4. Pipelining options

Pipelining options can be set with the following methods:

Functional model

set_pipelining_options()

Sequential model

set_pipelining_options()

Model subclass

set_pipelining_options()

See the respective API documentation for more details.

Gradient accumulation is always used when training a pipelined model (unless using the Sequential schedule). This means that you must set the option gradient_accumulation_steps_per_replica using this API when using the Grouped or Interleaved schedule. It is optional when using the Sequential schedule.

The API documentation for set_pipelining_options explains that the additional keyword arguments (pipelining_kwargs) will be forwarded to the tensorflow.python.ipu.pipelining_ops.pipeline() operator (which is used internally - see Section 4.9, Implementation details). Refer to the API documentation for pipeline() for details about these arguments.

The code sample below illustrates how options can be set with the set_pipelining_options API.

1
2
3
  model.set_pipelining_options(
        gradient_accumulation_steps_per_replica=16,
        pipeline_schedule=ipu.ops.pipelining_ops.PipelineSchedule.Interleaved)

4.5. Automatic data parallelism

IPU TensorFlow supports automatic data parallelism when multiple IPU devices are configured with the system. Automatic data parallelism is achieved by model replication across available IPU devices. The number of times the model is replicated is called the replication factor; higher replication factors allow higher data throughput.

When replicating, gradients are reduced across replicas during training, which has implications for gradient accumulation. For a non replicated model, the effective batch size is the product of the dataset batch size and the number of gradient accumulation steps. In the case of a replication factor greater than one, the effective batch size is additionally scaled by the replication factor according to the following formula:

effective_batch_size = dataset_batch_size * gradient_accumulation_steps_per_replica * num_replicas

4.6. Asynchronous callbacks

IPU TensorFlow supports the use of Callback objects with the Keras APIs, however there is an important difference to note when specifying steps_per_execution. In IPU TensorFlow, if steps_per_execution is specified for your model, then per-batch callback functions will only be invoked every steps_per_execution steps, which can have the effect of delaying access to results.

However, IPU TensorFlow also supports asynchronous callbacks by providing a polling mechanism which allows results to be accessed at the earliest possible instance. Asynchronous callbacks can be enabled by passing True to the following methods:

Functional model

set_asynchronous_callbacks()

Sequential model

set_asynchronous_callbacks()

Model subclass

set_asynchronous_callbacks()

See the respective API documentation for more details.

4.7. Configuring Infeeds and Outfeed

Keras models created inside of an IPUStrategy scope automatically create IPUInfeedQueue and IPUOutfeedQueue data queues for efficiently feeding data to and from the IPU devices when using fit(), evaluate() and predict().

Instances of IPUInfeedQueue and IPUOutfeedQueue can be created with optional arguments which can affect performance of the model.

Use the following methods to configure the IPUInfeedQueue for your Keras model:

Functional model

set_infeed_queue_options()

Sequential model

set_infeed_queue_options()

Model subclass

set_infeed_queue_options()

Use the following methods to configure the IPUOutfeedQueue for your Keras model:

Functional

set_outfeed_queue_options()

Sequential

set_outfeed_queue_options()

Model subclass

set_outfeed_queue_options()

For example the prefetch_depth parameter of the IPUInfeedQueue and the buffer_depth parameter of the IPUOutfeedQueue can be configured as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import tensorflow as tf
from tensorflow.python import ipu

# Configure the IPU device.
config = ipu.config.IPUConfig()
config.auto_select_ipus = 1
config.configure_ipu_system()


# Create a simple model.
def create_model():
  return tf.keras.Sequential([
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(256, activation='relu'),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])


# Create a strategy for execution on the IPU.
strategy = ipu.ipu_strategy.IPUStrategy()
with strategy.scope():

  model = create_model()

  # Set the infeed and outfeed options.
  model.set_infeed_queue_options(prefetch_depth=2)
  model.set_outfeed_queue_options(buffer_depth=2)

4.8. Saving and loading Keras models

Saving and loading a Keras model must be done within the IPUStrategy scope in order to save/load IPU-specific information.

When saving and loading Model subclasses, make sure to save and restore class members, such as layers, via the config. This can be done by overriding the get_config and from_config methods. Re-creating members from scratch can cause errors, as the original members may be restored as part of the IPU-specific internal state.

Note

The arguments pipelining_kwargs from set_pipelining_options() and gradient_accumulation_optimizer_kwargs from set_gradient_accumulation_options() are not serializable, which means that when the model is being saved, their values are not saved. When restoring/loading a model, call set_pipelining_options() or set_gradient_accumulation_options() again.

4.9. Implementation details

When instantiating a standard TensorFlow Keras model inside the scope of an IPUStrategy instance, it is dynamically injected with additional, IPU-specific, functions. This is done through the relevant IPU Keras extension classes:

Functional model

FunctionalExtension()

Sequential model

SequentialExtension()

Model subclass

ModelExtension()