8. Example using IPUEstimator
This example shows how to use the IPUEstimator
to train a simple
CNN on the CIFAR-10 dataset. The XLA compilation is already handled
while using the IPUEstimator
, so the model_fn
should not be
manually compiled with ipu_compiler
.
1import argparse
2from tensorflow.python.ipu.config import IPUConfig
3import time
4
5import tensorflow.compat.v1 as tf
6
7from tensorflow.keras import Sequential
8from tensorflow.keras.datasets import cifar10
9from tensorflow.keras.layers import Conv2D, MaxPooling2D
10from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
11from tensorflow.python import ipu
12
13NUM_CLASSES = 10
14
15
16def model_fn(features, labels, mode, params):
17 """A simple CNN based on https://keras.io/examples/cifar10_cnn/"""
18
19 model = Sequential()
20 model.add(Conv2D(16, (3, 3), padding="same"))
21 model.add(Activation("relu"))
22 model.add(Conv2D(16, (3, 3)))
23 model.add(Activation("relu"))
24 model.add(MaxPooling2D(pool_size=(2, 2)))
25 model.add(Dropout(0.25))
26
27 model.add(Conv2D(32, (3, 3), padding="same"))
28 model.add(Activation("relu"))
29 model.add(Conv2D(32, (3, 3)))
30 model.add(Activation("relu"))
31 model.add(MaxPooling2D(pool_size=(2, 2)))
32 model.add(Dropout(0.25))
33
34 model.add(Flatten())
35 model.add(Dense(256))
36 model.add(Activation("relu"))
37 model.add(Dropout(0.5))
38 model.add(Dense(NUM_CLASSES))
39
40 logits = model(features, training=mode == tf.estimator.ModeKeys.TRAIN)
41
42 loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
43
44 if mode == tf.estimator.ModeKeys.EVAL:
45 predictions = tf.argmax(input=logits, axis=-1)
46 eval_metric_ops = {
47 "accuracy": tf.metrics.accuracy(labels=labels,
48 predictions=predictions),
49 }
50 return tf.estimator.EstimatorSpec(mode,
51 loss=loss,
52 eval_metric_ops=eval_metric_ops)
53
54 if mode == tf.estimator.ModeKeys.TRAIN:
55 optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
56 if params["replicas"] > 1:
57 optimizer = ipu.cross_replica_optimizer.CrossReplicaOptimizer(optimizer)
58 train_op = optimizer.minimize(loss=loss)
59 return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
60
61 raise NotImplementedError(mode)
62
63
64def parse_args():
65 parser = argparse.ArgumentParser()
66
67 parser.add_argument(
68 "--test-only",
69 action="store_true",
70 help="Skip training and test using latest checkpoint from model_dir.")
71
72 parser.add_argument("--batch-size",
73 type=int,
74 default=32,
75 help="The batch size.")
76
77 parser.add_argument(
78 "--iterations-per-loop",
79 type=int,
80 default=100,
81 help="The number of iterations (batches) per loop on IPU.")
82
83 parser.add_argument("--log-interval",
84 type=int,
85 default=10,
86 help="Interval at which to log progress.")
87
88 parser.add_argument("--summary-interval",
89 type=int,
90 default=1,
91 help="Interval at which to write summaries.")
92
93 parser.add_argument("--training-steps",
94 type=int,
95 default=200000,
96 help="Total number of training steps.")
97
98 parser.add_argument(
99 "--learning-rate",
100 type=float,
101 default=0.01,
102 help="The learning rate used with stochastic gradient descent.")
103
104 parser.add_argument(
105 "--replicas",
106 type=int,
107 default=1,
108 help="The replication factor. Increases the number of IPUs "
109 "used and the effective batch size by this factor.")
110
111 parser.add_argument(
112 "--model-dir",
113 help="Directory where checkpoints and summaries are stored.")
114
115 return parser.parse_args()
116
117
118def create_ipu_estimator(args):
119 ipu_options = IPUConfig()
120 ipu_options.auto_select_ipus = args.replicas
121
122 ipu_run_config = ipu.ipu_run_config.IPURunConfig(
123 iterations_per_loop=args.iterations_per_loop,
124 num_replicas=args.replicas,
125 ipu_options=ipu_options,
126 )
127
128 config = ipu.ipu_run_config.RunConfig(
129 ipu_run_config=ipu_run_config,
130 log_step_count_steps=args.log_interval,
131 save_summary_steps=args.summary_interval,
132 model_dir=args.model_dir,
133 )
134
135 return ipu.ipu_estimator.IPUEstimator(
136 config=config,
137 model_fn=model_fn,
138 params={
139 "learning_rate": args.learning_rate,
140 "replicas": args.replicas
141 },
142 )
143
144
145def train(ipu_estimator, args, x_train, y_train):
146 """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
147 def input_fn():
148 # If using Dataset.from_tensor_slices(), the data will be embedded
149 # into the graph as constants, which makes the training graph very
150 # large and impractical. So use Dataset.from_generator() here instead,
151 # but add prefetching and caching to improve performance.
152
153 def generator():
154 return zip(x_train, y_train)
155
156 types = (x_train.dtype, y_train.dtype)
157 shapes = (x_train.shape[1:], y_train.shape[1:])
158
159 dataset = tf.data.Dataset.from_generator(generator, types, shapes)
160 dataset = dataset.prefetch(len(x_train)).cache()
161 dataset = dataset.repeat()
162 dataset = dataset.shuffle(len(x_train))
163 dataset = dataset.batch(args.batch_size, drop_remainder=True)
164
165 return dataset
166
167 # Training progress is logged as INFO, so enable that logging level
168 tf.logging.set_verbosity(tf.logging.INFO)
169
170 t0 = time.time()
171 ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
172 t1 = time.time()
173
174 duration_seconds = t1 - t0
175 images_per_step = args.batch_size * args.replicas
176 images_per_second = args.training_steps * images_per_step / duration_seconds
177 print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
178 duration_seconds / 60, images_per_second))
179
180
181def calc_batch_size(num_examples, batches_per_loop, batch_size):
182 """Reduce the batch size if needed to cover all examples without a remainder."""
183 assert batch_size > 0
184 assert num_examples % batches_per_loop == 0
185 while num_examples % (batch_size * batches_per_loop) != 0:
186 batch_size -= 1
187 return batch_size
188
189
190def test(ipu_estimator, args, x_test, y_test):
191 """Test the model on IPU by loading weights from the final checkpoint in the
192 given `args.model_dir`."""
193
194 num_test_examples = len(x_test)
195
196 batches_per_loop = args.replicas * args.iterations_per_loop
197 test_batch_size = calc_batch_size(num_test_examples, batches_per_loop,
198 args.batch_size)
199
200 if test_batch_size != args.batch_size:
201 print("Test batch size changed to {}.".format(test_batch_size))
202
203 def input_fn():
204 dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
205 dataset = dataset.batch(test_batch_size, drop_remainder=True)
206 return dataset
207
208 num_steps = num_test_examples // (test_batch_size * args.replicas)
209 metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
210 test_loss = metrics["loss"]
211 test_accuracy = metrics["accuracy"]
212
213 print("Test loss: {:g}".format(test_loss))
214 print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
215
216
217def main():
218 args = parse_args()
219 train_data, test_data = cifar10.load_data()
220
221 num_test_examples = len(test_data[0])
222 batches_per_loop = args.replicas * args.iterations_per_loop
223 if num_test_examples % batches_per_loop != 0:
224 raise ValueError(("replicas * iterations_per_loop ({} * {}) must evenly " +
225 "divide the number of test examples ({})").format(
226 args.replicas, args.iterations_per_loop,
227 num_test_examples))
228
229 ipu_estimator = create_ipu_estimator(args)
230
231 def normalise(x, y):
232 return x.astype("float32") / 255.0, y.astype("int32")
233
234 if not args.test_only:
235 print("Training...")
236 x_train, y_train = normalise(*train_data)
237 train(ipu_estimator, args, x_train, y_train)
238
239 print("Testing...")
240 x_test, y_test = normalise(*test_data)
241 test(ipu_estimator, args, x_test, y_test)
242
243
244if __name__ == "__main__":
245 main()