TensorFlow的Estimators API可用于在具有多个GPU的分布式环境中训练机器学习模型。在这里,我们将通过训练tf.keras为小型Fashion-MNIST数据集编写的自定义估算器来呈现此工作流程,然后在最后展示更实用的用例。 TL; DR:基本上我们想要记住的是,tf.keras.Model可以通过tf.estimatorAPI将其转换为tf.estimator.Estimator通过该tf.keras.estimator.model_to_estimator方法的对象来训练。转换后,我们可以应用Estimators提供的机制来训练不同的硬件配置。 导入Python库importimport osos importimport timetime #!pip install -q -U tensorflow-gpu#!pip i import tensorflow as tf import numpy as np
导入Fashion-MNIST数据集我们将使用Fashion-MNIST数据集(https://github.com/zalandoresearch/fashion-mnist),这是MNIST的直接替代品,其中包含数千张Zalando时尚文章的灰度图像。获取训练和测试数据非常简单: (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
我们想把这些图像的像素值从0到255转换成0到1到c之间的值,将数据集转换为[B,H,W,C]格式,其中B是批处理的图像数,H和W是高度和宽度,C是我们数据集的通道数(灰度为1),Python代码如下: TRAINING_SIZE = len(train_images) TEST_SIZE = len(test_images) train_images = np.asarray(train_images, dtype=np.float32) / 255 # Convert the train images and add channels train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1)) test_images = np.asarray(test_images, dtype=np.float32) / 255 # Convert the test images and add channels test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
接下来,我们想要将标签从整数id(例如,2或Pullover)转换为one hot编码(例如,0,0,1,0,0,0,0,0,0,0)。为此,我们将使用该tf.keras.utils.to_categorical 函数: # How many categories we are predicting from (0-9) LABEL_DIMENSIONS = 10 train_labels = tf.keras.utils.to_categorical(train_labels,LABEL_DIMENSIONS) test_labels = tf.keras.utils.to_categorical(test_labels,LABEL_DIMENSIONS) # Cast the labels to floats, needed later train_labels = train_labels.astype(np.float32) test_labels = test_labels.astype(np.float32)
建立一个tf.keras 模型我们将使用Keras Functional API创建我们的神经网络。Keras是一个用于构建和训练深度学习模型的高级API,用户友好,模块化且易于扩展。tf.keras是TensorFlow的这个API的实现,它支持诸如Eager Execution,tf.data管道和Estimators之类的东西。 就架构而言,我们将使用ConvNets。对于每个训练示例,它们都使用一个3d张量(height, width, channels),对于灰度图像,这个张量从channels=1开始,然后返回一个3d张量。 因此,在ConvNet部分之后,我们将需要Flatten张量并添加Dense图层,其中最后一个返回LABEL_DIMENSIONS带有tf.nn.softmax激活的大小向量,Python代码如下: inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder tensor x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu)(inputs) x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x) x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x) x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x) x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x) x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x) predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS, activation=tf.nn.softmax)(x)
我们现在可以定义我们的机器学习模型,选择优化器(我们从TensorFlow中选择一个而不是使用一个tf.keras.optimizers)并编译它: model = tf.keras.Model(inputs=inputs, outputs=predictions) optimizer = tf.train.AdamOptimizer(learning_rate=0.001) model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
创建一个Estimator要从编译的Keras模型创建Estimator,我们称之为model_to_estimator方法。请注意,Keras模型的初始模型状态将保留在创建的Estimator中。 那么Estimators有什么好处呢? - 您可以在本地主机或分布式多GPU环境中运行基于Estimator的机器学习模型,而无需更改模型;
- Estimators简化了机器学习模型开发人员之间的共享实现;
- Estimators为您构建图,因此有点像Eager Execution,没有明确的会话。
那么我们如何开始训练我们的简单tf.keras模型来使用多GPU呢?我们可以使用tf.contrib.distribute.MirroredStrategy通过同步训练进行图内复制的范例。 基本上每个worker GPU都有一个网络副本,并获取数据的子集,在该数据上计算本地梯度,然后等待所有workers以同步方式完成。然后,workers通过Ring All-reduce操作将他们的本地梯度相互通信,这通常被优化以减少网络带宽并增加吞吐量。一旦所有梯度到达,每个worker将它们平均并更新其参数,然后开始下一步。这在通过某些高速互连连接的单个节点上有多个GPU的情况下非常理想。 要使用此策略,我们首先从编译的tf.keras模型中创建一个Estimator,然后MirroredStrategy通过RunConfigconfig 为其配置。默认情况下,此配置将使用所有GPU,但您也可以num_gpus选择使用特定数量的GPU,Python示例如下: NUM_GPUS = 2 strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS) config = tf.estimator.RunConfig(train_distribute=strategy) estimator = tf.keras.estimator.model_to_estimator(model, config=config)
创建一个Estimator输入函数要将数据传输到Estimators,我们需要定义一个数据导入函数,该函数返回批量数据的数据tf.data集(images,labels)。下面的函数接受numpy数组并通过ETL过程返回数据集。 请注意,最后我们还调用了这样的prefetch方法,它将在训练时将数据缓冲到GPU,以便下一批准备就绪并等待GPU,而不是让GPU在每次迭代时等待数据。GPU可能仍然没有得到充分利用,为了改善这一点,我们可以使用fused版本的转换操作,比如shuffle_and_repeat,而不是两个单独的操作。 def input_fn(images, labels, epochs, batch_size): # Convert the inputs to a Dataset. (E) ds = tf.data.Dataset.from_tensor_slices((images, labels)) # Shuffle, repeat, and batch the examples. (T) SHUFFLE_SIZE = 5000 ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size) ds = ds.prefetch(2) # Return the dataset. (L) return ds
训练Estimator让我们首先定义一个SessionRunHook类来记录随机梯度下降的每次迭代的次数: class TimeHistory(tf.train.SessionRunHook): def begin(self): self.times = [] def before_run(self, run_context): self.iter_time_start = time.time() def after_run(self, run_context, run_values): self.times.append(time.time() - self.iter_time_start)
我们可以在我们的Estimator 上调用train函数,赋予它我们定义的input_fn(带有批大小和我们想要训练的epoch的数量)和一个TimeHistory实例,通过它的hooks参数: time_hist = TimeHistory() BATCH_SIZE = 512 EPOCHS = 5 estimator.train(lambda:input_fn(train_images, train_labels, epochs=EPOCHS, batch_size=BATCH_SIZE), hooks=[time_hist])
性能们现在可以用它来计算训练的总时间以及平均每秒训练的图像数量(平均输入量): total_time = sum(time_hist.times) print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds") avg_time_per_batch = np.mean(time_hist.times) print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with {NUM_GPUS} GPU(s)")
评估Estimator为了检查我们的模型的性能,我们调用评估方法对我们的Estimator: estimator.evaluate(lambda:input_fn(test_images, test_labels, epochs=1, batch_size=BATCH_SIZE))
Retinal OCT(光学相干断层扫描)图像示例为了测试更大数据集的缩放性能,我们使用Retinal OCT图像数据集(https://www.kaggle.com/paultimothymooney/kermany2018),这是Kaggle的众多优秀数据集之一。该数据集由活人视网膜的横截面X射线图像组成,分为四类:NORMAL,CNV,DME和DRUSEN: 数据集通常共有84,495个X射线JPEG图像,512x496可以通过kaggle CLI下载: #!pip install kaggle #!kaggle datasets download -d paultimothymooney/kermany2018
下载后,训练和测试集图像类位于各自的文件夹中,因此我们可以将模式定义为: labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL'] train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg') test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')
接下来,我们编写Estimator的输入函数,它可以获取任何文件模式,并返回调整大小的图像和one-hot编码标签,作为tf.data.Dataset。如果prefetch的buffer_size为None,则TensorFlow将自动使用最优的prefetch缓冲区大小,Python实现如下: def input_fn(file_pattern, labels, image_size=(224,224), shuffle=False, batch_size=64, num_epochs=None, buffer_size=4096, prefetch_buffer_size=None): table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels)) num_classes = len(labels) def _map_func(filename): label = tf.string_split([filename], delimiter=os.sep).values[-2] image = tf.image.decode_jpeg(tf.read_file(filename), channels=3) image = tf.image.convert_image_dtype(image, dtype=tf.float32) image = tf.image.resize_images(image, size=image_size) return (image, tf.one_hot(table.lookup(label), num_classes)) dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle) if num_epochs is not None and shuffle: dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs)) elif shuffle: dataset = dataset.shuffle(buffer_size) elif num_epochs is not None: dataset = dataset.repeat(num_epochs) dataset = dataset.apply( tf.contrib.data.map_and_batch(map_func=_map_func, batch_size=batch_size, num_parallel_calls=os.cpu_count())) dataset = dataset.prefetch(buffer_size=prefetch_buffer_size) return dataset
这次要训练这个机器学习模型,我们将使用一个预先训练过的VGG16,并重新训练它的最后5层: keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3), include_top=False) output = keras_vgg16.output output = tf.keras.layers.Flatten()(output) prediction = tf.keras.layers.Dense(len(labels), activation=tf.nn.softmax)(output) model = tf.keras.Model(inputs=keras_vgg16.input, outputs=prediction) for layer in keras_vgg16.layers[:-4]: layer.trainable = False
可以按照上面的步骤进行操作,并使用NUM_GPUSGPU 在几分钟内训练我们的机器学习模型: model.compile(loss='categorical_crossentropy', optimizer=tf.train.AdamOptimizer(), metrics=['accuracy']) NUM_GPUS = 2 strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS) config = tf.estimator.RunConfig(train_distribute=strategy) estimator = tf.keras.estimator.model_to_estimator(model, config=config) BATCH_SIZE = 64 EPOCHS = 1 estimator.train(input_fn=lambda:input_fn(train_folder, labels, shuffle=True, batch_size=BATCH_SIZE, buffer_size=2048, num_epochs=EPOCHS, prefetch_buffer_size=4), hooks=[time_hist])
经过训练,我们可以评估测试集的准确度,该准确度应该在95%左右 estimator.evaluate(input_fn=lambda:input_fn(test_folder, labels, shuffle=False, batch_size=BATCH_SIZE, buffer_size=1024, num_epochs=1))
最后我们在上面展示了使用Estimators API在多个GPU上训练深度学习Keras模型是,和如何编写一个遵循最佳实践的输入管道来充分利用我们的资源(线性缩放)以及如何计时通过钩子训练吞吐量。 随着num_gpu的增加,测试集的准确性会降低。其中一个原因可能是,当我们使用更多GPU时,MirroredStrategy有效训练批量大小BATCH_SIZE*NUM_GPUS可能需要调整BATCH_SIZE或学习率。在这里NUM_GPUS,为了制作图形,我保持所有其他超参数不变,但实际上需要调整它们。 数据集的大小以及机器学习模型大小也会影响这些方案的扩展程度。GPU在读取或写入小数据时带宽较差,对于像K80这样的旧GPU尤其如此,并且可以解释上面的Fashion-MNIST图。 |