9. Example using IPUPipelineEstimator
This example shows how to use the IPUPipelineEstimator
to train a simple CNN on the CIFAR-10 dataset. It can be
compared to the example using the IPUEstimator
(Section 8, Example using IPUEstimator) to see the
changes required to add pipelined execution to a model.
  1import argparse
  2import time
  3
  4import tensorflow.compat.v1 as tf
  5
  6from tensorflow.keras.datasets import cifar10
  7from tensorflow.keras.layers import Conv2D, MaxPooling2D
  8from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
  9from tensorflow.python import ipu
 10
 11NUM_CLASSES = 10
 12
 13
 14def model_fn(mode, params):
 15  """A simple CNN based on https://keras.io/examples/cifar10_cnn/ split
 16  into two pipeline stages placed on different IPUs."""
 17
 18  # Tell the dropout layers whether we are training to avoid a placeholder.
 19  is_training = mode == tf.estimator.ModeKeys.TRAIN
 20
 21  def stage1(features, labels):
 22    partial = Conv2D(16, (3, 3), padding="same")(features)
 23    partial = Activation("relu")(partial)
 24    partial = Conv2D(16, (3, 3))(partial)
 25    partial = Activation("relu")(partial)
 26    partial = MaxPooling2D(pool_size=(2, 2))(partial)
 27    partial = Dropout(0.25)(partial, training=is_training)
 28
 29    return partial, labels
 30
 31  def stage2(partial, labels):
 32    partial = Conv2D(32, (3, 3), padding="same")(partial)
 33    partial = Activation("relu")(partial)
 34    partial = Conv2D(32, (3, 3))(partial)
 35    partial = Activation("relu")(partial)
 36    partial = MaxPooling2D(pool_size=(2, 2))(partial)
 37    partial = Dropout(0.25)(partial, training=is_training)
 38
 39    partial = Flatten()(partial)
 40    partial = Dense(256)(partial)
 41    partial = Activation("relu")(partial)
 42    partial = Dropout(0.5)(partial, training=is_training)
 43    logits = Dense(NUM_CLASSES)(partial)
 44
 45    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
 46
 47    if mode == tf.estimator.ModeKeys.TRAIN:
 48      # This return value is passed to the `optimizer_function`.
 49      return loss
 50
 51    if mode == tf.estimator.ModeKeys.EVAL:
 52      predictions = tf.argmax(input=logits, axis=1, output_type=tf.int32)
 53      # These return values are passed to the `eval_metrics_fn`.
 54      return loss, predictions, labels
 55
 56    raise NotImplementedError(mode)
 57
 58  def optimizer_function(loss):
 59    optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
 60    return ipu.pipelining_ops.OptimizerFunctionOutput(optimizer, loss)
 61
 62  def eval_metrics_fn(loss, predictions, labels):
 63    # This is executed on the host.
 64    return {
 65        "loss": loss,
 66        "accuracy": tf.metrics.accuracy(predictions=predictions,
 67                                        labels=labels),
 68    }
 69
 70  return ipu.ipu_pipeline_estimator.IPUPipelineEstimatorSpec(
 71      mode,
 72      computational_stages=[stage1, stage2],
 73      optimizer_function=optimizer_function,
 74      eval_metrics_fn=eval_metrics_fn,
 75      gradient_accumulation_count=params["gradient_accumulation_count"])
 76
 77
 78def parse_args():
 79  parser = argparse.ArgumentParser()
 80
 81  parser.add_argument(
 82      "--test-only",
 83      action="store_true",
 84      help="Skip training and test using latest checkpoint from model_dir.")
 85
 86  parser.add_argument("--batch-size",
 87                      type=int,
 88                      default=16,
 89                      help="The batch size.")
 90
 91  parser.add_argument(
 92      "--gradient-accumulation-count",
 93      type=int,
 94      default=4,
 95      help="The the number of batches that will be pipelined together.")
 96
 97  parser.add_argument(
 98      "--iterations-per-loop",
 99      type=int,
100      default=100,
101      help="The number of iterations (batches consumed) per loop on IPU.")
102
103  parser.add_argument("--log-interval",
104                      type=int,
105                      default=10,
106                      help="Interval at which to log progress.")
107
108  parser.add_argument("--summary-interval",
109                      type=int,
110                      default=1,
111                      help="Interval at which to write summaries.")
112
113  parser.add_argument("--training-steps",
114                      type=int,
115                      default=100000,
116                      help="Total number of training steps.")
117
118  parser.add_argument(
119      "--learning-rate",
120      type=float,
121      default=0.01,
122      help="The learning rate used with stochastic gradient descent.")
123
124  parser.add_argument(
125      "--model-dir",
126      help="Directory where checkpoints and summaries are stored.")
127
128  return parser.parse_args()
129
130
131def create_ipu_estimator(args):
132  num_ipus_in_pipeline = 2
133
134  ipu_options = ipu.config.IPUConfig()
135  ipu_options.auto_select_ipus = num_ipus_in_pipeline
136
137  ipu_run_config = ipu.ipu_run_config.IPURunConfig(
138      num_shards=num_ipus_in_pipeline,
139      iterations_per_loop=args.iterations_per_loop,
140      ipu_options=ipu_options,
141  )
142
143  config = ipu.ipu_run_config.RunConfig(
144      ipu_run_config=ipu_run_config,
145      log_step_count_steps=args.log_interval,
146      save_summary_steps=args.summary_interval,
147      model_dir=args.model_dir,
148  )
149
150  return ipu.ipu_pipeline_estimator.IPUPipelineEstimator(
151      config=config,
152      model_fn=model_fn,
153      params={
154          "learning_rate": args.learning_rate,
155          "gradient_accumulation_count": args.gradient_accumulation_count,
156      },
157  )
158
159
160def train(ipu_estimator, args, x_train, y_train):
161  """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
162  def input_fn():
163    # If using Dataset.from_tensor_slices(), the data will be embedded
164    # into the graph as constants, which makes the training graph very
165    # large and impractical. So use Dataset.from_generator() here instead,
166    # but add prefetching and caching to improve performance.
167
168    def generator():
169      return zip(x_train, y_train)
170
171    types = (x_train.dtype, y_train.dtype)
172    shapes = (x_train.shape[1:], y_train.shape[1:])
173
174    dataset = tf.data.Dataset.from_generator(generator, types, shapes)
175    dataset = dataset.prefetch(len(x_train)).cache()
176    dataset = dataset.repeat()
177    dataset = dataset.shuffle(len(x_train))
178    dataset = dataset.batch(args.batch_size, drop_remainder=True)
179
180    return dataset
181
182  # Training progress is logged as INFO, so enable that logging level
183  tf.logging.set_verbosity(tf.logging.INFO)
184
185  t0 = time.time()
186  ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
187  t1 = time.time()
188
189  duration_seconds = t1 - t0
190  images_per_second = args.training_steps * args.batch_size / duration_seconds
191  print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
192      duration_seconds / 60, images_per_second))
193
194
195def calc_batch_size(num_examples, batches_per_loop, batch_size):
196  """Reduce the batch size if needed to cover all examples without a remainder."""
197  assert batch_size > 0
198  assert num_examples % batches_per_loop == 0
199  while num_examples % (batch_size * batches_per_loop) != 0:
200    batch_size -= 1
201  return batch_size
202
203
204def test(ipu_estimator, args, x_test, y_test):
205  """Test the model on IPU by loading weights from the final checkpoint in the
206  given `args.model_dir`."""
207
208  num_test_examples = len(x_test)
209
210  test_batch_size = calc_batch_size(num_test_examples,
211                                    args.iterations_per_loop, args.batch_size)
212
213  if test_batch_size != args.batch_size:
214    print("Test batch size changed to {}.".format(test_batch_size))
215
216  def input_fn():
217    dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
218    dataset = dataset.batch(test_batch_size, drop_remainder=True)
219    return dataset
220
221  num_steps = num_test_examples // test_batch_size
222  metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
223  test_loss = metrics["loss"]
224  test_accuracy = metrics["accuracy"]
225
226  print("Test loss: {:g}".format(test_loss))
227  print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
228
229
230def main():
231  args = parse_args()
232  train_data, test_data = cifar10.load_data()
233
234  num_test_examples = len(test_data[0])
235  if num_test_examples % args.iterations_per_loop != 0:
236    raise ValueError(
237        ("iterations_per_loop ({}) must evenly divide the number of test "
238         "examples ({})").format(args.iterations_per_loop, num_test_examples))
239
240  ipu_estimator = create_ipu_estimator(args)
241
242  def normalise(x, y):
243    return x.astype("float32") / 255.0, y.astype("int32")
244
245  if not args.test_only:
246    print("Training...")
247    x_train, y_train = normalise(*train_data)
248    train(ipu_estimator, args, x_train, y_train)
249
250  print("Testing...")
251  x_test, y_test = normalise(*test_data)
252  test(ipu_estimator, args, x_test, y_test)
253
254
255if __name__ == "__main__":
256  main()
Download ipu_pipeline_estimator_example.py