使用Kubeflow构建机器学习流水线

在此前的文章中,我已经向你介绍了Kubeflow,这是一个为团队设置的机器学习平台,需要构建机器学习流水线。

在本文中,我们将了解如何采用现有的机器学习详细并将其变成Kubeflow的机器学习流水线,进而可以部署在Kubernetes上。在进行本次练习的时候,请考虑你该如何将现有的机器学习项目转换到Kubeflow上。

我将使用Fashion MNIST作为例子,因为在本次练习中模型的复杂性并不是我们需要解决的主要目标。对于这一简单的例子,我将流水线分为3个阶段:

Git clone代码库

下载并重新处理训练和测试数据

训练评估

当然,你可以根据自己的用例将流水线以任意形式拆分,并且可以随意扩展流水线。

获取代码

你可以从Github上获取代码:

% git clone https://github.com/benjamintanweihao/kubeflow-mnist.git

以下是我们用来创建流水线的完整清单。实际上,你的代码很可能跨多个库和文件。在我们的例子中,我们将代码分为两个脚本,preprocessing.py和train.py。

from tensorflow import keras import argparse import os import pickle def preprocess(data_dir: str): fashion_mnist = keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() train_images = train_images / 255.0 test_images = test_images / 255.0 os.makedirs(data_dir, exist_ok=True) with open(os.path.join(data_dir, 'train_images.pickle'), 'wb') as f: pickle.dump(train_images, f) with open(os.path.join(data_dir, 'train_labels.pickle'), 'wb') as f: pickle.dump(train_labels, f) with open(os.path.join(data_dir, 'test_images.pickle'), 'wb') as f: pickle.dump(test_images, f) with open(os.path.join(data_dir, 'test_labels.pickle'), 'wb') as f: pickle.dump(test_labels, f) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kubeflow MNIST training script') parser.add_argument('--data_dir', help='path to images and labels.') args = parser.parse_args() preprocess(data_dir=args.data_dir)

处理脚本采用单个参数data_dir。它下载并预处理数据,并将pickled版本保存在data_dir中。在生产代码中,这可能是TFRecords的存储目录。

train.py

import calendar import os import time import tensorflow as tf import pickle import argparse from tensorflow import keras from constants import PROJECT_ROOT def train(data_dir: str): # Training model = keras.Sequential([ keras.layers.Flatten(input_shape=(28, 28)), keras.layers.Dense(128, activation='relu'), keras.layers.Dense(10)]) model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy']) with open(os.path.join(data_dir, 'train_images.pickle'), 'rb') as f: train_images = pickle.load(f) with open(os.path.join(data_dir, 'train_labels.pickle'), 'rb') as f: train_labels = pickle.load(f) model.fit(train_images, train_labels, epochs=10) with open(os.path.join(data_dir, 'test_images.pickle'), 'rb') as f: test_images = pickle.load(f) with open(os.path.join(data_dir, 'test_labels.pickle'), 'rb') as f: test_labels = pickle.load(f) # Evaluation test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2) print(f'Test Loss: {test_loss}') print(f'Test Acc: {test_acc}') # Save model ts = calendar.timegm(time.gmtime()) model_path = os.path.join(PROJECT_ROOT, f'mnist-{ts}.h5') tf.saved_model.save(model, model_path) with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f: f.write(model_path) print(f'Model written to: {model_path}') if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kubeflow FMNIST training script') parser.add_argument('--data_dir', help='path to images and labels.') args = parser.parse_args() train(data_dir=args.data_dir)

在train.py中,将建立模型,并使用data_dir指定训练和测试数据的位置。模型训练完毕并且开始执行评估后,将模型写入带有时间戳的路径。请注意,该路径也已写入output.txt。稍后将对此进行引用。

开发Kubeflow流水线

为了开始创建Kubeflow流水线,我们需要拉取一些依赖项。我准备了一个environment.yml,其中包括了kfp 0.5.0、tensorflow以及其他所需的依赖项。

你需要安装Conda,然后执行以下步骤:

% conda env create -f environment.yml % source activate kubeflow-mnist % python preprocessing.py --data_dir=http://www.likecs.com/path/to/data % python train.py --data_dir=http://www.likecs.com/path/to/data

现在我们来回顾一下我们流水线中的几个步骤:

Git clone代码库

下载并预处理训练和测试数据

训练并进行评估

在我们开始写代码之前,需要从宏观上了解Kubeflow流水线。

流水线由连接组件构成。一个组件的输出成为另一个组件的输入,每个组件实际上都在容器中执行(在本例中为Docker)。将发生的情况是,我们会执行一个我们稍后将要指定的Docker镜像,它包含了我们运行preprocessing.py和train.py所需的一切。当然,这两个阶段会有它们的组件。

我们还需要额外的一个镜像以git clone项目。我们需要将项目bake到Docker镜像,但在实际项目中,这可能会导致Docker镜像的大小膨胀。

说到Docker镜像,我们应该先创建一个。

Step0:创建一个Docker镜像

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwsff.html