TensorFlow on Kubernetes 的架构与实践
技术
作者:王涛
译者:
2018-01-29 08:10


这两年,Kubernetes 在企业中的 DevOps、微服务领域取得了出色的成绩,从 2017 年开始,将 Kubernetes 应用到 HPC、AI 等领域也成了技术热点。这里我给大家分享一下 Kubernetes 在 AI 中的落地经验,内容包括 TensorFlow on Kubernetes 的架构与实践,以及线上经验和坑。


都是玩容器的老司机,都知道 Kubernetes 这两年非常火,截止目前在 GitHub上31K+ stars ,然而相比于 TensorFlow ,也就只能说是一般般了。TensorFlow 才两年多,在 GitHub 上已经有 86K+ stars,这是个什么概念呢?要知道,Linux Kernel 这么多年才积累 54K+ stars ,当然,它们各自都是所在领域的霸主,这种对比只当闲谈。


这两年,Kubernetes 在各个企业中的 DevOps、微服务方向取得了出色的成绩,从2017年开始,越来越多的企业也开始探索将 Kubernetes 应用到 HPC、AI 等领域。随着公司AI业务的迅猛增长,vivo 在 2017 年 9 月也开始基于 Kubernetes 强大的分布式能力,探索与 TensorFlow 等 ML 框架深度整合,提高数据中心资源利用率,加快算法迭代速度。


Kubernetes 在 AI 中的应用与在 DevOps 中部署 App 相比,最大的差别在于容器的规模以及容器生命周期。在我们的实践中,目前集群服务器规模很小的情况下,每天要调度近 10W 的容器,有很多容器可能只运行了十几分钟甚至几分钟,而且计划在 2018 年,集群规模还要翻十倍。在 DevOps 场景,应用发布频率再高,我相信一年下来能调度 10W 容器的企业并不多。下面我将聊一下 TensorFlow on Kubernetes 的架构及在 vivo 的实践。


分布式 TensorFlow


TensorFlow 是一个使用数据流图进行数值计算的开源软件库。图中的节点代表数学运算,而图中的边则代表在这些节点之间传递的多维数组(张量)。这种灵活的架构可让您使用一个API将计算工作部署到桌面设备、服务器或者移动设备中的一个或多个 CPU 或 GPU 。 关于 TensorFlow 的基础概念,我就不多介绍了。


单机 TensorFlow


下面是一个单机式 TensorFlow 训练示意图,通过 Client 提交 Session,定义这个 worker 要用哪个CPU/GPU 做什么事。




分布式 TensorFlow


2016 年 4 月 TensorFlow 发布了 0.8 版本宣布支持分布式计算,我们称之为 Distributed TensorFlow 。这是非常重要的一个特性,因为在 AI 的世界里,训练的数据量和模型参数通常会非常大。比如Google Brain 实验室今年发表的论文《Outrageously Large Neural Networks: The Sparsely-Gated Mixture-of-Experts Layer》中提到一个 680 亿个 Parameters 的模型,如果只能单机训练,那耗时难于接受。通过 Distributed TensorFlow,可以利用大量服务器构建分布式 TensorFlow 集群来提高训练效率,减少训练时间。


通过 TensorFlow Replcation 机制,用户可以将 SubGraph 分布到不同的服务器中进行分布式计算。TensorFlow 的副本机制又分为两种,In-graph 和 Between-graph。


In-graph Replication 简单来讲,就是通过单个 client session 定义这个 TensorFlow 集群的所有 task的工作。




与之相对地,Between-graph Replication 就是每个 worker 都有独立的 client 来定义自己的工作。




下面是抽象出来的分布式 TensorFlow Framework 如下:


我们先来了解里面的几个概念:


Cluster


一个 TensorFlow Cluster 有一个或多个 jobs 组成,每个 job 又由一个或多个 tasks 构成。Cluster 的定义是通过 tf.train.ClusterSpec 来定义的。比如,定义一个由 3 个 worker 和 2 个 PS 的 TensorFlow Cluster 的 ClusterSpec 如下:


Client


Client 用来 build 一个 TensorFlow Graph ,并构建一个 tensorflow::Session 用来与集群通信。一个Client 可以与多个 TensorFlow Server 交互,一个 Server 能服务多个 Client 。

Job


一个 Job 由 tasks list 组成,Job 分 PS 和 Worker 两种类型。PS 即 parameter server,用来存储和更新 variables 的,而 Worker 可以认为是无状态的,用来作为计算任务的。Workers 中,一般都会选择一个 chief worker(通常是 worker0 ),用来做训练状态的 checkpoint ,如果有 worker 故障,那么可以从最新 checkpoint 中 restore 。

Task


每个 Task 对应一个 TensorFlow Server,对应一个单独的进程。一个 Task 属于某个 Job ,通过一个 index 来标记它在对应 Job 的 tasks 中的位置。每个 TensorFlow 均实现了 Master service 和 Worker service。Master service 用来与集群内的 worker services 进行 gRPC 交互。Worker service 则是用local device 来计算 Subgraph。

关于 Distributed TensorFlow 的更多内容,请参考官方内容:

www.tensorflow.org/deplopy/distributed。


分布式 TensorFlow 的缺陷


分布式 TensorFlow 能利用数据中心所有服务器构成的资源池,让大量 PS 和 Worker 能分布在不同的服务器进行参数存储和训练,这无疑是 TensorFlow 能否在企业落地的关键点。然而,这还不够,它还存在一下先天不足:

  • 训练时 TensorFlow 各个 Task 资源无法隔离,很有可能会导致任务间因资源抢占互相影响。
  • 缺乏调度能力,需要用户手动配置和管理任务的计算资源。
  • 集群规模大时,训练任务的管理很麻烦,要跟踪和管理每个任务的状态,需要在上层做大量开发。
  • 用户要查看各个 Task 的训练日志需要找出对应的服务器,并 ssh 过去,非常不方便。
  • TensorFlow 原生支持的后端文件系统只支持:标准 Posix 文件系统(比如 NFS )、HDFS、GCS、memory-mapped-file。大多数企业中数据都是存在大数据平台,因此以 HDFS 为主。然而,HDFS的 Read 性能并不是很好。
  • 当你试着去创建一个大规模 TensorFlow 集群时,发现并不轻松。


TensorFlow on Kubernetes 架构与原理


TensorFlow 的这些不足,正好是 Kubernetes 的强项:

  • 提供 ResourceQuota、LimitRanger 等多种资源管理机制,能做到任务之间很好的资源隔离。
  • 支持任务的计算资源的配置和调度。
  • 训练任务以容器方式运行,Kubernetes 提供全套的容器 PLEG 接口,因此任务状态的管理很方便。
  • 轻松对接EFK/ELK等日志方案,用户能方便的查看任务日志。
  • 支持 Read 性能更优秀的分布式存储( GlusterFS ),但目前我们也还没对接 GlusterFS ,有计划但没人力。
  • 通过声明式文件实现轻松快捷的创建一个大规模 TensorFlow 集群。


TensorFlow on Kubernetes 架构




TensorFlow on Kubernetes 原理


在我们的 TensorFlow on Kubernetes 方案中,主要用到以下的 Kubernetes 对象:


Kubernetes Job


我们用 Kubernetes Job 来部署 TensorFlow Worker,Worker 训练正常完成退出,就不会再重启容器了。注意 Job 中的 Pod Template restartPolicy 只能为 Never 或者 OnFailure ,不能为 Always ,这里我们设定 restartPolicy 为 OnFailure ,worker 一旦异常退出,都会自动重启。但是要注意,要保证worker 重启后训练能从 checkpoint restore,不然worker重启后又从 step 0 开始,可能跑了几天的训练就白费了。如果你使用 TensorFlow 高级 API 写的算法,默认都实现了这点,但是如果你是使用底层 Core API,一定要注意自己实现。


kind: Job
apiVersion: batch/v1
metadata:
 name: {{ name }}-{{ task_type }}-{{ i }}
 namespace: {{ name }}
spec:
 template:
   metadata:
     labels:
       name: {{ name }}
       job: {{ task_type }}
       task: "{{ i }}"
   spec:
     imagePullSecrets:
     - name: harborsecret
     containers:
     - name: {{ name }}-{{ task_type }}-{{ i }}
       image: {{ image }}
       resources:
         requests:
           memory: "4Gi"
           cpu: "500m"
       ports:
       - containerPort: 2222
       command: ["/bin/sh", "-c", "export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]
     restartPolicy: OnFailure


Kubernetes Deployment


TensorFlow PS 用 Kubernetes Deployment 来部署。为什么不像 worker 一样,也使用 Job 来部署呢?其实也未尝不可,但是考虑到 PS 进程并不会等所有 worker 训练完成时自动退出(一直挂起),所以使用 Job 部署没什么意义。

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
 name: {{ name }}-{{ task_type }}-{{ i }}
 namespace: {{ name }}
spec:
 replicas: 1
 template:
   metadata:
     labels:
       name: {{ name }}
       job: {{ task_type }}
       task: "{{ i }}"
   spec:
     imagePullSecrets:
     - name: harborsecret
     containers:
     - name: {{ name }}-{{ task_type }}-{{ i }}
       image: {{ image }}
       resources:
         requests:
           memory: "4Gi"
           cpu: "500m"
       ports:
       - containerPort: 2222
       command: ["/bin/sh", "-c","export CLASSPATH=.:/usr/lib/jvm/java-1.8.0/lib/tools.jar:$(/usr/lib/hadoop-2.6.1/bin/hadoop classpath --glob); wget -r -nH  -np --cut-dir=1 -R 'index.html*,*gif'  {{ script }}; cd ./{{ name }}; sh ./run.sh {{ ps_hosts() }} {{ worker_hosts() }} {{ task_type }} {{ i }} {{ ps_replicas }} {{ worker_replicas }}"]
     restartPolicy: Always

关于 TensorFlow PS 进程挂起的问题,请参考:https://github.com/tensorflow/tensorflow/issues/4713。我们是这么解决的,开发了一个模块,watch 每个 TensorFlow 集群的所有 worker 状态,当所有 worker 对应 Job 都 Completed 时,就会自动去删除 PS 对应的 Deployment ,从而 kill PS 进程释放资源。


Kubernetes Headless Service


Headless Service 通常用来解决 Kubernetes 里面部署的应用集群之间的内部通信。在这里,我们也是这么用的,我们会为每个 TensorFlow 对应的 Job 和 Deployment 对象都创建一个 Headless Service 作为 Worker 和 PS 的通信代理。

用 Headless Service 的好处,就是在 KubeDNS 中,Service Name 的域名解析直接对应到 PodIp ,而没有 service VIP 这一层,这就不依赖于 kube-proxy 去创建 iptables 规则了。少了 kube-proxy 的iptables这一层,带来的性能的提升。


在 TensorFlow 场景中,这是不可小觑的,因为一个 TensorFlow Task 都会创建一个 service ,几万个service是很正常的事,如果使用 Normal Service,iptables 规则就几十万上百万条了,增删一条iptabels 规则耗时几个小时甚至几天,集群早已奔溃。关于 kube-proxy iptables 模式的性能测试数据,请参考华为 PaaS 团队的相关分享。

KubeDNS Autoscaler



前面提到,每个 TensorFlow Task 都会创建一个 service,都会在 KubeDNS 中有一条对应的解析规则,但 service 数量太多的时候,我们发现有些 Worker 的域名解析失败概率很大,十几次才能成功解析一次。这样会影响 TensorFlow 集群内各个 task 的 session 建立,可能导致 TensorFlow 集群起不来。


为了解决这个问题,我们引入了 Kubernetes 的孵化项目 kubernetes-incubator/cluster-proportional-autoscaler 来对 KubeDNS 进行动态伸缩。


TensorFlow on Kubernetes 实践


基于上面的方案,我们开发一个 TaaS 平台,已经实现了基本的功能,包括算法管理、训练集群的创建和管理、模型的管理、模型上线( TensorFlow Serving )、一键创建 TensorBoard 服务、任务资源监控、集群资源监控、定时训练管理、任务日志在线查看和批量打包下载等等,这部分内容可以参考之前在DockOne上分享的文章《 vivo 基于 Kubernetes 构建企业级 TaaS 平台实践》。

这只是刚开始,我正在做下面的特性:

  • 支持基于训练优先级的任务抢占式调度:用户在 TaaS 上创建 TensorFlow 训练项目时,可以指定项目的优先级为生产( Production )、迭代( Iteration )、调研( PTR ),默认为迭代。优先级从高到低依次为Production --> Iteration --> PTR。但集群资源不足时,按照任务优先级进行抢占式调度。
  • 提供像 Yarn形式的资源分配视图,让用户对自己的所有训练项目的资源占用情况变得清晰。
  • 训练和预测的混合部署,提供数据中心资源利用率。
  • ……


经验和坑


整个过程中,遇到了很多坑,有 TensorFlow 的,也有 Kubernetes 的,不过问题最多的还是我们用的CNI 网络插件 Contiv Netplugin,每次大问题基本都是这个网络插件造成的。Kubernetes 是问题最少的,它的稳定性比我预期还要好。

  • Contiv Netplugin 问题,在 DevOps 环境中还是稳定的,在大规模高并发的 AI 场景,问题就层出不穷了,产生大量垃圾 IP 和 Openflow 流表,直接把 Node 都成 NotReady 了,具体的不多说,因为据我了解,现在用这个插件的公司已经很少了,想了解的私下找我。
  • 在我们的方案中,一个 TensorFlow 训练集群就对应一个 Kubernetes Namespace,项目初期我们并没有对及时清理垃圾 Namespace,到后来集群里上万 Namespace 的时候,整个 Kubernetes 集群的相关 API 性能非常差了,导致 TaaS 的用户体验非常差。
  • TensorFlow gRPC 性能差,上千个 Worker 的训练集群,概率性的出现这样的报错:grpc_chttp2_stream request on server; last grpc_chttp2_stream id=231, new grpc_chttp2_stream id=227,只能尝试通过升级 TensorFlow 来升级 gRPC 了。目前我们通过增加单个 Worker 的计算负载来减少Worker数量的方法,减少 gRPC 压力。除此之外,规模大一点TensorFlow 还有各种问题,甚至还有自己的 OOM 机制,防不胜防。不过,高速成长的东西,我们要给它足够的容忍。
  • 还有 TensorFlow OOM 的问题等等。


Q&A


Q:Worker 为什么不直接用 Pod ,而用的 Job?

A:Kubernetes 中是不建议直接使用 Pod 的,建议通过一个控制器( RS、RC、Deploy、StatefulSets、Job 等)来创建和管理Pod 。

Q:我看训练的时候并没有指定数据集和训练参数等,是都放到训练脚本内了吗,还有训练集是放到Gluster,挂载到容器内吗,还是换存到本地?

A:目前训练数据和参数都是在脚本里用户自己搞定,正在把这些放到 Portal ,提供“命令行参数自定义”的能力。目前我们的训练数据是从 HDFS 集群直接走网络读取,Kubernetes 本身也没有 HDFS 的Volume Plugin。

Q:请问 PS 的个数是用户指定还是根据 Worker 的数量来指派?

A:PS 和 Worker 数都是通过用户指定,但实际上都是用户根据自己的训练数据大小来计算的。

Q:分布式训练的时候,Training data 是如何分发到各个 Worker 节点的?Tensorflow API 可以做到按节点数自动分发吗?

A:各个 Worker 都是从 HDFS 集群读取训练数据加载到内存的。数据的分配都是用户自己在脚本种实现的。

END

694 comCount 0