多GPU训Estimators,tf.keras和 tf.data

2020-3-30 21:47| 查看: 1325

摘要: TensorFlow的Estimators API可用于在具有多个GPU的分布式环境中训练机器学习模型。在这里,我们将通过训练tf.keras为小型Fashion-MNIST数据集编写的自定义估算器来呈现此工作流程,然后在最后展示更实用的用例。TL;

TensorFlow的Estimators API可用于在具有多个GPU的分布式环境中训练机器学习模型。在这里,我们将通过训练tf.keras为小型Fashion-MNIST数据集编写的自定义估算器来呈现此工作流程,然后在最后展示更实用的用例。

TL; DR:基本上我们想要记住的是,tf.keras.Model可以通过tf.estimatorAPI将其转换为tf.estimator.Estimator通过该tf.keras.estimator.model_to_estimator方法的对象来训练。转换后,我们可以应用Estimators提供的机制来训练不同的硬件配置。

导入Python库

importimport osos

importimport timetime

#!pip install -q -U tensorflow-gpu#!pip i

import tensorflow as tf

import numpy as np

导入Fashion-MNIST数据集

我们将使用Fashion-MNIST数据集(https://github.com/zalandoresearch/fashion-mnist),这是MNIST的直接替代品,其中包含数千张Zalando时尚文章的灰度图像。获取训练和测试数据非常简单:

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

我们想把这些图像的像素值从0到255转换成0到1到c之间的值,将数据集转换为[B,H,W,C]格式,其中B是批处理的图像数,H和W是高度和宽度,C是我们数据集的通道数(灰度为1),Python代码如下:

TRAINING_SIZE = len(train_images)

TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255

# Convert the train images and add channels

train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))

test_images = np.asarray(test_images, dtype=np.float32) / 255

# Convert the test images and add channels

test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))

接下来,我们想要将标签从整数id(例如,2或Pullover)转换为one hot编码(例如,0,0,1,0,0,0,0,0,0,0)。为此,我们将使用该tf.keras.utils.to_categorical 函数:

# How many categories we are predicting from (0-9)

LABEL_DIMENSIONS = 10

train_labels = tf.keras.utils.to_categorical(train_labels,LABEL_DIMENSIONS)

test_labels = tf.keras.utils.to_categorical(test_labels,LABEL_DIMENSIONS)

# Cast the labels to floats, needed later

train_labels = train_labels.astype(np.float32)

test_labels = test_labels.astype(np.float32)

建立一个tf.keras 模型

我们将使用Keras Functional API创建我们的神经网络。Keras是一个用于构建和训练深度学习模型的高级API,用户友好,模块化且易于扩展。tf.keras是TensorFlow的这个API的实现,它支持诸如Eager Execution,tf.data管道和Estimators之类的东西。

就架构而言,我们将使用ConvNets。对于每个训练示例,它们都使用一个3d张量(height, width, channels),对于灰度图像,这个张量从channels=1开始,然后返回一个3d张量。

因此,在ConvNet部分之后,我们将需要Flatten张量并添加Dense图层,其中最后一个返回LABEL_DIMENSIONS带有tf.nn.softmax激活的大小向量,Python代码如下:

inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder tensor

x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu)(inputs)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)

x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)

predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS, activation=tf.nn.softmax)(x)

我们现在可以定义我们的机器学习模型,选择优化器(我们从TensorFlow中选择一个而不是使用一个tf.keras.optimizers)并编译它:

model = tf.keras.Model(inputs=inputs, outputs=predictions)

optimizer = tf.train.AdamOptimizer(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',

optimizer=optimizer,

metrics=['accuracy'])

创建一个Estimator

要从编译的Keras模型创建Estimator,我们称之为model_to_estimator方法。请注意,Keras模型的初始模型状态将保留在创建的Estimator中。

那么Estimators有什么好处呢?

  • 您可以在本地主机或分布式多GPU环境中运行基于Estimator的机器学习模型,而无需更改模型;
  • Estimators简化了机器学习模型开发人员之间的共享实现;
  • Estimators为您构建图,因此有点像Eager Execution,没有明确的会话。

那么我们如何开始训练我们的简单tf.keras模型来使用多GPU呢?我们可以使用tf.contrib.distribute.MirroredStrategy通过同步训练进行图内复制的范例。

基本上每个worker GPU都有一个网络副本,并获取数据的子集,在该数据上计算本地梯度,然后等待所有workers以同步方式完成。然后,workers通过Ring All-reduce操作将他们的本地梯度相互通信,这通常被优化以减少网络带宽并增加吞吐量。一旦所有梯度到达,每个worker将它们平均并更新其参数,然后开始下一步。这在通过某些高速互连连接的单个节点上有多个GPU的情况下非常理想。

要使用此策略,我们首先从编译的tf.keras模型中创建一个Estimator,然后MirroredStrategy通过RunConfigconfig 为其配置。默认情况下,此配置将使用所有GPU,但您也可以num_gpus选择使用特定数量的GPU,Python示例如下:

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)

config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,

config=config)

创建一个Estimator输入函数

要将数据传输到Estimators,我们需要定义一个数据导入函数,该函数返回批量数据的数据tf.data集(images,labels)。下面的函数接受numpy数组并通过ETL过程返回数据集。

请注意,最后我们还调用了这样的prefetch方法,它将在训练时将数据缓冲到GPU,以便下一批准备就绪并等待GPU,而不是让GPU在每次迭代时等待数据。GPU可能仍然没有得到充分利用,为了改善这一点,我们可以使用fused版本的转换操作,比如shuffle_and_repeat,而不是两个单独的操作。

def input_fn(images, labels, epochs, batch_size):

# Convert the inputs to a Dataset. (E)

ds = tf.data.Dataset.from_tensor_slices((images, labels))

# Shuffle, repeat, and batch the examples. (T)

SHUFFLE_SIZE = 5000

ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)

ds = ds.prefetch(2)

# Return the dataset. (L)

return ds

训练Estimator

让我们首先定义一个SessionRunHook类来记录随机梯度下降的每次迭代的次数:

class TimeHistory(tf.train.SessionRunHook):

def begin(self):

self.times = []

def before_run(self, run_context):

self.iter_time_start = time.time()

def after_run(self, run_context, run_values):

self.times.append(time.time() - self.iter_time_start)

我们可以在我们的Estimator 上调用train函数,赋予它我们定义的input_fn(带有批大小和我们想要训练的epoch的数量)和一个TimeHistory实例,通过它的hooks参数:

time_hist = TimeHistory()

BATCH_SIZE = 512

EPOCHS = 5

estimator.train(lambda:input_fn(train_images,

train_labels,

epochs=EPOCHS,

batch_size=BATCH_SIZE),

hooks=[time_hist])

性能

们现在可以用它来计算训练的总时间以及平均每秒训练的图像数量(平均输入量):

total_time = sum(time_hist.times)

print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")

avg_time_per_batch = np.mean(time_hist.times)

print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with

{NUM_GPUS} GPU(s)")

多GPU训Estimators,tf.keras和 tf.data

多GPU训Estimators,tf.keras和 tf.data

评估Estimator

为了检查我们的模型的性能,我们调用评估方法对我们的Estimator:

estimator.evaluate(lambda:input_fn(test_images,

test_labels,

epochs=1,

batch_size=BATCH_SIZE))

Retinal OCT(光学相干断层扫描)图像示例

为了测试更大数据集的缩放性能,我们使用Retinal OCT图像数据集(https://www.kaggle.com/paultimothymooney/kermany2018),这是Kaggle的众多优秀数据集之一。该数据集由活人视网膜的横截面X射线图像组成,分为四类:NORMAL,CNV,DME和DRUSEN:

多GPU训Estimators,tf.keras和 tf.data

数据集通常共有84,495个X射线JPEG图像,512x496可以通过kaggle CLI下载:

#!pip install kaggle

#!kaggle datasets download -d paultimothymooney/kermany2018

下载后,训练和测试集图像类位于各自的文件夹中,因此我们可以将模式定义为:

labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')

test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')

接下来,我们编写Estimator的输入函数,它可以获取任何文件模式,并返回调整大小的图像和one-hot编码标签,作为tf.data.Dataset。如果prefetch的buffer_size为None,则TensorFlow将自动使用最优的prefetch缓冲区大小,Python实现如下:

def input_fn(file_pattern, labels,

image_size=(224,224),

shuffle=False,

batch_size=64,

num_epochs=None,

buffer_size=4096,

prefetch_buffer_size=None):

table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))

num_classes = len(labels)

def _map_func(filename):

label = tf.string_split([filename], delimiter=os.sep).values[-2]

image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)

image = tf.image.convert_image_dtype(image, dtype=tf.float32)

image = tf.image.resize_images(image, size=image_size)

return (image, tf.one_hot(table.lookup(label), num_classes))

dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)

if num_epochs is not None and shuffle:

dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size,

num_epochs))

elif shuffle:

dataset = dataset.shuffle(buffer_size)

elif num_epochs is not None:

dataset = dataset.repeat(num_epochs)

dataset = dataset.apply(

tf.contrib.data.map_and_batch(map_func=_map_func,

batch_size=batch_size,

num_parallel_calls=os.cpu_count()))

dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)

return dataset

这次要训练这个机器学习模型,我们将使用一个预先训练过的VGG16,并重新训练它的最后5层:

keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),

include_top=False)

output = keras_vgg16.output

output = tf.keras.layers.Flatten()(output)

prediction = tf.keras.layers.Dense(len(labels),

activation=tf.nn.softmax)(output)

model = tf.keras.Model(inputs=keras_vgg16.input,

outputs=prediction)

for layer in keras_vgg16.layers[:-4]:

layer.trainable = False

可以按照上面的步骤进行操作,并使用NUM_GPUSGPU 在几分钟内训练我们的机器学习模型:

model.compile(loss='categorical_crossentropy',

optimizer=tf.train.AdamOptimizer(),

metrics=['accuracy'])

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)

config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,

config=config)

BATCH_SIZE = 64

EPOCHS = 1

estimator.train(input_fn=lambda:input_fn(train_folder,

labels,

shuffle=True,

batch_size=BATCH_SIZE,

buffer_size=2048,

num_epochs=EPOCHS,

prefetch_buffer_size=4),

hooks=[time_hist])

经过训练,我们可以评估测试集的准确度,该准确度应该在95%左右

estimator.evaluate(input_fn=lambda:input_fn(test_folder,

labels,

shuffle=False,

batch_size=BATCH_SIZE,

buffer_size=1024,

num_epochs=1))

多GPU训Estimators,tf.keras和 tf.data

多GPU训Estimators,tf.keras和 tf.data

最后

我们在上面展示了使用Estimators API在多个GPU上训练深度学习Keras模型是,和如何编写一个遵循最佳实践的输入管道来充分利用我们的资源(线性缩放)以及如何计时通过钩子训练吞吐量。

随着num_gpu的增加,测试集的准确性会降低。其中一个原因可能是,当我们使用更多GPU时,MirroredStrategy有效训练批量大小BATCH_SIZE*NUM_GPUS可能需要调整BATCH_SIZE或学习率。在这里NUM_GPUS,为了制作图形,我保持所有其他超参数不变,但实际上需要调整它们。

数据集的大小以及机器学习模型大小也会影响这些方案的扩展程度。GPU在读取或写入小数据时带宽较差,对于像K80这样的旧GPU尤其如此,并且可以解释上面的Fashion-MNIST图。


鲜花

握手

雷人

路过

鸡蛋