5. Example using IPUEstimator

This example shows how to use the IPUEstimator to train a simple CNN on the CIFAR-10 dataset. The XLA compilation is already handled while using the IPUEstimator, so the model_fn should not be manually compiled with ipu_compiler.

  1import argparse
  2import time
  3
  4import tensorflow.compat.v1 as tf
  5
  6from tensorflow.keras import Sequential
  7from tensorflow.keras.datasets import cifar10
  8from tensorflow.keras.layers import Conv2D, MaxPooling2D
  9from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
 10from tensorflow.python import ipu
 11
 12NUM_CLASSES = 10
 13
 14
 15def model_fn(features, labels, mode, params):
 16  """A simple CNN based on https://keras.io/examples/cifar10_cnn/"""
 17
 18  model = Sequential()
 19  model.add(Conv2D(32, (3, 3), padding="same"))
 20  model.add(Activation("relu"))
 21  model.add(Conv2D(32, (3, 3)))
 22  model.add(Activation("relu"))
 23  model.add(MaxPooling2D(pool_size=(2, 2)))
 24  model.add(Dropout(0.25))
 25
 26  model.add(Conv2D(64, (3, 3), padding="same"))
 27  model.add(Activation("relu"))
 28  model.add(Conv2D(64, (3, 3)))
 29  model.add(Activation("relu"))
 30  model.add(MaxPooling2D(pool_size=(2, 2)))
 31  model.add(Dropout(0.25))
 32
 33  model.add(Flatten())
 34  model.add(Dense(512))
 35  model.add(Activation("relu"))
 36  model.add(Dropout(0.5))
 37  model.add(Dense(NUM_CLASSES))
 38
 39  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
 40
 41  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
 42
 43  if mode == tf.estimator.ModeKeys.EVAL:
 44    predictions = tf.argmax(input=logits, axis=-1)
 45    eval_metric_ops = {
 46        "accuracy": tf.metrics.accuracy(labels=labels,
 47                                        predictions=predictions),
 48    }
 49    return tf.estimator.EstimatorSpec(mode,
 50                                      loss=loss,
 51                                      eval_metric_ops=eval_metric_ops)
 52
 53  if mode == tf.estimator.ModeKeys.TRAIN:
 54    optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
 55    if params["replicas"] > 1:
 56      optimizer = ipu.cross_replica_optimizer.CrossReplicaOptimizer(optimizer)
 57    train_op = optimizer.minimize(loss=loss)
 58    return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
 59
 60  raise NotImplementedError(mode)
 61
 62
 63def parse_args():
 64  parser = argparse.ArgumentParser()
 65
 66  parser.add_argument(
 67      "--test-only",
 68      action="store_true",
 69      help="Skip training and test using latest checkpoint from model_dir.")
 70
 71  parser.add_argument("--batch-size",
 72                      type=int,
 73                      default=32,
 74                      help="The batch size.")
 75
 76  parser.add_argument(
 77      "--iterations-per-loop",
 78      type=int,
 79      default=100,
 80      help="The number of iterations (batches) per loop on IPU.")
 81
 82  parser.add_argument("--log-interval",
 83                      type=int,
 84                      default=10,
 85                      help="Interval at which to log progress.")
 86
 87  parser.add_argument("--summary-interval",
 88                      type=int,
 89                      default=1,
 90                      help="Interval at which to write summaries.")
 91
 92  parser.add_argument("--training-steps",
 93                      type=int,
 94                      default=200000,
 95                      help="Total number of training steps.")
 96
 97  parser.add_argument(
 98      "--learning-rate",
 99      type=float,
100      default=0.01,
101      help="The learning rate used with stochastic gradient descent.")
102
103  parser.add_argument(
104      "--replicas",
105      type=int,
106      default=1,
107      help="The replication factor. Increases the number of IPUs "
108      "used and the effective batch size by this factor.")
109
110  parser.add_argument(
111      "--model-dir",
112      help="Directory where checkpoints and summaries are stored.")
113
114  return parser.parse_args()
115
116
117def create_ipu_estimator(args):
118  ipu_options = ipu.utils.create_ipu_config(
119      profiling=False,
120      use_poplar_text_report=False,
121  )
122
123  ipu.utils.auto_select_ipus(ipu_options, num_ipus=args.replicas)
124
125  ipu_run_config = ipu.ipu_run_config.IPURunConfig(
126      iterations_per_loop=args.iterations_per_loop,
127      num_replicas=args.replicas,
128      ipu_options=ipu_options,
129  )
130
131  config = ipu.ipu_run_config.RunConfig(
132      ipu_run_config=ipu_run_config,
133      log_step_count_steps=args.log_interval,
134      save_summary_steps=args.summary_interval,
135      model_dir=args.model_dir,
136  )
137
138  return ipu.ipu_estimator.IPUEstimator(
139      config=config,
140      model_fn=model_fn,
141      params={
142          "learning_rate": args.learning_rate,
143          "replicas": args.replicas
144      },
145  )
146
147
148def train(ipu_estimator, args, x_train, y_train):
149  """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
150  def input_fn():
151    # If using Dataset.from_tensor_slices(), the data will be embedded
152    # into the graph as constants, which makes the training graph very
153    # large and impractical. So use Dataset.from_generator() here instead,
154    # but add prefetching and caching to improve performance.
155
156    def generator():
157      return zip(x_train, y_train)
158
159    types = (x_train.dtype, y_train.dtype)
160    shapes = (x_train.shape[1:], y_train.shape[1:])
161
162    dataset = tf.data.Dataset.from_generator(generator, types, shapes)
163    dataset = dataset.prefetch(len(x_train)).cache()
164    dataset = dataset.repeat()
165    dataset = dataset.shuffle(len(x_train))
166    dataset = dataset.batch(args.batch_size, drop_remainder=True)
167
168    return dataset
169
170  # Training progress is logged as INFO, so enable that logging level
171  tf.logging.set_verbosity(tf.logging.INFO)
172
173  t0 = time.time()
174  ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
175  t1 = time.time()
176
177  duration_seconds = t1 - t0
178  images_per_step = args.batch_size * args.replicas
179  images_per_second = args.training_steps * images_per_step / duration_seconds
180  print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
181      duration_seconds / 60, images_per_second))
182
183
184def calc_batch_size(num_examples, batches_per_loop, batch_size):
185  """Reduce the batch size if needed to cover all examples without a remainder."""
186  assert batch_size > 0
187  assert num_examples % batches_per_loop == 0
188  while num_examples % (batch_size * batches_per_loop) != 0:
189    batch_size -= 1
190  return batch_size
191
192
193def test(ipu_estimator, args, x_test, y_test):
194  """Test the model on IPU by loading weights from the final checkpoint in the
195  given `args.model_dir`."""
196
197  num_test_examples = len(x_test)
198
199  batches_per_loop = args.replicas * args.iterations_per_loop
200  test_batch_size = calc_batch_size(num_test_examples, batches_per_loop,
201                                    args.batch_size)
202
203  if test_batch_size != args.batch_size:
204    print("Test batch size changed to {}.".format(test_batch_size))
205
206  def input_fn():
207    dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
208    dataset = dataset.batch(test_batch_size, drop_remainder=True)
209    return dataset
210
211  num_steps = num_test_examples // (test_batch_size * args.replicas)
212  metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
213  test_loss = metrics["loss"]
214  test_accuracy = metrics["accuracy"]
215
216  print("Test loss: {:g}".format(test_loss))
217  print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
218
219
220def main():
221  args = parse_args()
222  train_data, test_data = cifar10.load_data()
223
224  num_test_examples = len(test_data[0])
225  batches_per_loop = args.replicas * args.iterations_per_loop
226  if num_test_examples % batches_per_loop != 0:
227    raise ValueError(("replicas * iterations_per_loop ({} * {}) must evenly " +
228                      "divide the number of test examples ({})").format(
229                          args.replicas, args.iterations_per_loop,
230                          num_test_examples))
231
232  ipu_estimator = create_ipu_estimator(args)
233
234  def normalise(x, y):
235    return x.astype("float32") / 255.0, y.astype("int32")
236
237  if not args.test_only:
238    print("Training...")
239    x_train, y_train = normalise(*train_data)
240    train(ipu_estimator, args, x_train, y_train)
241
242  print("Testing...")
243  x_test, y_test = normalise(*test_data)
244  test(ipu_estimator, args, x_test, y_test)
245
246
247if __name__ == "__main__":
248  main()