8. Example using IPUEstimator

This example shows how to use the IPUEstimator to train a simple CNN on the CIFAR-10 dataset. The XLA compilation is already handled while using the IPUEstimator, so the model_fn should not be manually compiled with ipu_compiler.

  1import argparse
  2from tensorflow.python.ipu.config import IPUConfig
  3import time
  4
  5import tensorflow.compat.v1 as tf
  6
  7from tensorflow.keras import Sequential
  8from tensorflow.keras.datasets import cifar10
  9from tensorflow.keras.layers import Conv2D, MaxPooling2D
 10from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
 11from tensorflow.python import ipu
 12
 13NUM_CLASSES = 10
 14
 15
 16def model_fn(features, labels, mode, params):
 17  """A simple CNN based on https://keras.io/examples/cifar10_cnn/"""
 18
 19  model = Sequential()
 20  model.add(Conv2D(16, (3, 3), padding="same"))
 21  model.add(Activation("relu"))
 22  model.add(Conv2D(16, (3, 3)))
 23  model.add(Activation("relu"))
 24  model.add(MaxPooling2D(pool_size=(2, 2)))
 25  model.add(Dropout(0.25))
 26
 27  model.add(Conv2D(32, (3, 3), padding="same"))
 28  model.add(Activation("relu"))
 29  model.add(Conv2D(32, (3, 3)))
 30  model.add(Activation("relu"))
 31  model.add(MaxPooling2D(pool_size=(2, 2)))
 32  model.add(Dropout(0.25))
 33
 34  model.add(Flatten())
 35  model.add(Dense(256))
 36  model.add(Activation("relu"))
 37  model.add(Dropout(0.5))
 38  model.add(Dense(NUM_CLASSES))
 39
 40  logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
 41
 42  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
 43
 44  if mode == tf.estimator.ModeKeys.EVAL:
 45    predictions = tf.argmax(input=logits, axis=-1)
 46    eval_metric_ops = {
 47        "accuracy": tf.metrics.accuracy(labels=labels,
 48                                        predictions=predictions),
 49    }
 50    return tf.estimator.EstimatorSpec(mode,
 51                                      loss=loss,
 52                                      eval_metric_ops=eval_metric_ops)
 53
 54  if mode == tf.estimator.ModeKeys.TRAIN:
 55    optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
 56    if params["replicas"] > 1:
 57      optimizer = ipu.cross_replica_optimizer.CrossReplicaOptimizer(optimizer)
 58    train_op = optimizer.minimize(loss=loss)
 59    return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
 60
 61  raise NotImplementedError(mode)
 62
 63
 64def parse_args():
 65  parser = argparse.ArgumentParser()
 66
 67  parser.add_argument(
 68      "--test-only",
 69      action="store_true",
 70      help="Skip training and test using latest checkpoint from model_dir.")
 71
 72  parser.add_argument("--batch-size",
 73                      type=int,
 74                      default=32,
 75                      help="The batch size.")
 76
 77  parser.add_argument(
 78      "--iterations-per-loop",
 79      type=int,
 80      default=100,
 81      help="The number of iterations (batches) per loop on IPU.")
 82
 83  parser.add_argument("--log-interval",
 84                      type=int,
 85                      default=10,
 86                      help="Interval at which to log progress.")
 87
 88  parser.add_argument("--summary-interval",
 89                      type=int,
 90                      default=1,
 91                      help="Interval at which to write summaries.")
 92
 93  parser.add_argument("--training-steps",
 94                      type=int,
 95                      default=200000,
 96                      help="Total number of training steps.")
 97
 98  parser.add_argument(
 99      "--learning-rate",
100      type=float,
101      default=0.01,
102      help="The learning rate used with stochastic gradient descent.")
103
104  parser.add_argument(
105      "--replicas",
106      type=int,
107      default=1,
108      help="The replication factor. Increases the number of IPUs "
109      "used and the effective batch size by this factor.")
110
111  parser.add_argument(
112      "--model-dir",
113      help="Directory where checkpoints and summaries are stored.")
114
115  return parser.parse_args()
116
117
118def create_ipu_estimator(args):
119  ipu_options = IPUConfig()
120  ipu_options.auto_select_ipus = args.replicas
121
122  ipu_run_config = ipu.ipu_run_config.IPURunConfig(
123      iterations_per_loop=args.iterations_per_loop,
124      num_replicas=args.replicas,
125      ipu_options=ipu_options,
126  )
127
128  config = ipu.ipu_run_config.RunConfig(
129      ipu_run_config=ipu_run_config,
130      log_step_count_steps=args.log_interval,
131      save_summary_steps=args.summary_interval,
132      model_dir=args.model_dir,
133  )
134
135  return ipu.ipu_estimator.IPUEstimator(
136      config=config,
137      model_fn=model_fn,
138      params={
139          "learning_rate": args.learning_rate,
140          "replicas": args.replicas
141      },
142  )
143
144
145def train(ipu_estimator, args, x_train, y_train):
146  """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
147  def input_fn():
148    # If using Dataset.from_tensor_slices(), the data will be embedded
149    # into the graph as constants, which makes the training graph very
150    # large and impractical. So use Dataset.from_generator() here instead,
151    # but add prefetching and caching to improve performance.
152
153    def generator():
154      return zip(x_train, y_train)
155
156    types = (x_train.dtype, y_train.dtype)
157    shapes = (x_train.shape[1:], y_train.shape[1:])
158
159    dataset = tf.data.Dataset.from_generator(generator, types, shapes)
160    dataset = dataset.prefetch(len(x_train)).cache()
161    dataset = dataset.repeat()
162    dataset = dataset.shuffle(len(x_train))
163    dataset = dataset.batch(args.batch_size, drop_remainder=True)
164
165    return dataset
166
167  # Training progress is logged as INFO, so enable that logging level
168  tf.logging.set_verbosity(tf.logging.INFO)
169
170  t0 = time.time()
171  ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
172  t1 = time.time()
173
174  duration_seconds = t1 - t0
175  images_per_step = args.batch_size * args.replicas
176  images_per_second = args.training_steps * images_per_step / duration_seconds
177  print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
178      duration_seconds / 60, images_per_second))
179
180
181def calc_batch_size(num_examples, batches_per_loop, batch_size):
182  """Reduce the batch size if needed to cover all examples without a remainder."""
183  assert batch_size > 0
184  assert num_examples % batches_per_loop == 0
185  while num_examples % (batch_size * batches_per_loop) != 0:
186    batch_size -= 1
187  return batch_size
188
189
190def test(ipu_estimator, args, x_test, y_test):
191  """Test the model on IPU by loading weights from the final checkpoint in the
192  given `args.model_dir`."""
193
194  num_test_examples = len(x_test)
195
196  batches_per_loop = args.replicas * args.iterations_per_loop
197  test_batch_size = calc_batch_size(num_test_examples, batches_per_loop,
198                                    args.batch_size)
199
200  if test_batch_size != args.batch_size:
201    print("Test batch size changed to {}.".format(test_batch_size))
202
203  def input_fn():
204    dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
205    dataset = dataset.batch(test_batch_size, drop_remainder=True)
206    return dataset
207
208  num_steps = num_test_examples // (test_batch_size * args.replicas)
209  metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
210  test_loss = metrics["loss"]
211  test_accuracy = metrics["accuracy"]
212
213  print("Test loss: {:g}".format(test_loss))
214  print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
215
216
217def main():
218  args = parse_args()
219  train_data, test_data = cifar10.load_data()
220
221  num_test_examples = len(test_data[0])
222  batches_per_loop = args.replicas * args.iterations_per_loop
223  if num_test_examples % batches_per_loop != 0:
224    raise ValueError(("replicas * iterations_per_loop ({} * {}) must evenly " +
225                      "divide the number of test examples ({})").format(
226                          args.replicas, args.iterations_per_loop,
227                          num_test_examples))
228
229  ipu_estimator = create_ipu_estimator(args)
230
231  def normalise(x, y):
232    return x.astype("float32") / 255.0, y.astype("int32")
233
234  if not args.test_only:
235    print("Training...")
236    x_train, y_train = normalise(*train_data)
237    train(ipu_estimator, args, x_train, y_train)
238
239  print("Testing...")
240  x_test, y_test = normalise(*test_data)
241  test(ipu_estimator, args, x_test, y_test)
242
243
244if __name__ == "__main__":
245  main()