摘要:本文基于官方教程,實踐了分布式搭建的過程。一般將任務分為兩類一類叫參數服務器,,簡稱為,用于存儲一類就是普通任務,稱為,用于執行具體的計算。參數服務器是一套分布式存儲,用于保存參數,并提供參數更新的操作。
簡介
TensorFlow支持使用多臺機器的設備進行計算。本文基于官方教程,實踐了分布式TensorFlow搭建的過程。
TensorFlow入門教程
基本概念 TensorFlow集群A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.
從上面的定義可以看出,所謂的TensorFlow集群就是一組任務,每個任務就是一個服務。服務由兩個部分組成,第一部分是master,用于創建session,第二部分是worker,用于執行具體的計算。
TensorFlow一般將任務分為兩類job:一類叫參數服務器,parameter server,簡稱為ps,用于存儲tf.Variable;一類就是普通任務,稱為worker,用于執行具體的計算。
首先來理解一下參數服務器的概念。一般而言,機器學習的參數訓練過程可以劃分為兩個類別:第一個是根據參數算算梯度,第二個是根據梯度更新參數。對于小規模訓練,數據量不大,參數數量不多,一個CPU就足夠了,兩類任務都交給一個CPU來做。對于普通的中等規模的訓練,數據量比較大,參數數量不多,計算梯度的任務負荷較重,參數更新的任務負荷較輕,所以將第一類任務交給若干個CPU或GPU去做,第二類任務交給一個CPU即可。對于超大規模的訓練,數據量大、參數多,不僅計算梯度的任務要部署到多個CPU或GPU上,而且更新參數的任務也要部署到多個CPU。如果計算量足夠大,一臺機器能搭載的CPU和GPU數量有限,就需要多臺機器來進行計算能力的擴展了。參數服務器是一套分布式存儲,用于保存參數,并提供參數更新的操作。
我們來看一下怎么創建一個TensorFlow集群。每個任務用一個ip:port表示。TensorFlow用tf.train.ClusterSpec表示一個集群信息,舉例如下:
import tensorflow as tf # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
上面的語句提供了一個TensorFlow集群信息,集群有兩類任務,稱為job,一個job是ps,一個job是worker;ps由2個任務組成,worker由3個任務組成。
定義完集群信息后,使用tf.train.Server創建每個任務:
tf.app.flags.DEFINE_string("job_name", "worker", "One of "ps", "worker"") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) server.join() if __name__ == "__main__": tf.app.run()
對于本例而言,我們需要在ip:port對應的機器上運行每個任務,共需執行五次代碼,生成五個任務。
python worker.py --job_name=ps --task_index=0 python worker.py --job_name=ps --task_index=1 python worker.py --job_name=worker --task_index=0 python worker.py --job_name=worker --task_index=1 python worker.py --job_name=worker --task_index=2
我們找到集群的某一臺機器,執行下面的代碼:
# -*- coding=utf-8 -*- import tensorflow as tf import numpy as np train_X = np.random.rand(100).astype(np.float32) train_Y = train_X * 0.1 + 0.3 # 選擇變量存儲位置和op執行位置,這里全部放在worker的第一個task上 with tf.device("/job:worker/task:0"): X = tf.placeholder(tf.float32) Y = tf.placeholder(tf.float32) w = tf.Variable(0.0, name="weight") b = tf.Variable(0.0, name="reminder") y = w * X + b loss = tf.reduce_mean(tf.square(y - Y)) init_op = tf.global_variables_initializer() train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss) # 選擇創建session使用的master with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess: sess.run(init_op) for i in range(500): sess.run(train_op, feed_dict={X: train_Y, Y: train_Y}) if i % 50 == 0: print i, sess.run(w), sess.run(b) print sess.run(w) print sess.run(b)
執行結果如下:
0 0.00245265 0.00697793 50 0.0752466 0.213145 100 0.0991397 0.279267 150 0.107308 0.30036 200 0.110421 0.306972 250 0.111907 0.308929 300 0.112869 0.309389 350 0.113663 0.309368 400 0.114402 0.309192 450 0.115123 0.308967 0.115824 0.30873
其實ps和worker本質上是一個東西,就是名字不同,我們將上例中的with tf.device("/job:worker/task:0"):改為with tf.device("/job:psr/task:0"):,一樣能夠執行。之所以在創建集群時要分為兩個類別的任務,是因為TensorFlow提供了一些工具函數,會根據名字不同賦予task不同的任務,ps的用于存儲變量,worker的用于計算。
同步與異步更新同步更新:將數據拆分成多份,每份基于參數計算出各自部分的梯度;當每一份的部分梯度計算完成后,收集到一起算出總梯度,再用總梯度去更新參數。
異步更新:同步更新模式下,每次都要等各個部分的梯度計算完后才能進行參數更新操作,處理速度取決于計算梯度最慢的那個部分,其他部分存在大量的等待時間浪費;異步更新模式下,所有的部分只需要算自己的梯度,根據自己的梯度更新參數,不同部分之間不存在通信和等待。
分布式訓練案例import tensorflow as tf import numpy as np # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): x_data = tf.placeholder(tf.float32, [100]) y_data = tf.placeholder(tf.float32, [100]) W = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) b = tf.Variable(tf.zeros([1])) y = W * x_data + b loss = tf.reduce_mean(tf.square(y - y_data)) global_step = tf.Variable(0, name="global_step", trainable=False) optimizer = tf.train.GradientDescentOptimizer(0.1) train_op = optimizer.minimize(loss, global_step=global_step) tf.summary.scalar("cost", loss) summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() # The StopAtStepHook handles stopping after running given steps. hooks = [ tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index], is_chief=(FLAGS.task_index==0), # 我們制定task_index為0的任務為主任務,用于負責變量初始化、做checkpoint、保存summary和復原 checkpoint_dir="/tmp/tf_train_logs", save_checkpoint_secs=None, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. train_x = np.random.rand(100).astype(np.float32) train_y = train_x * 0.1 + 0.3 _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y}) if step % 100 == 0: print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v) print "Optimization finished." if __name__ == "__main__": tf.app.run()
代碼中,tf.train.replica_device_setter()會根據job名,將with內的Variable op放到ps tasks,將其他計算op放到worker tasks。默認分配策略是輪詢。
在屬于集群的一臺機器中執行上面的代碼,屏幕會開始輸出每輪迭代的訓練參數和損失
python train.py --task_index=0
在另一臺機器上執行下面你的代碼,再啟動一個任務,會看到屏幕開始輸出每輪迭代的訓練參數和損失,注意,step不再是從0開始,而是在啟動時刻上一個啟動任務的step后繼續。此時觀察兩個任務,會發現他們同時在對同一參數進行更新。
python train.py --task_index=2思考
分布式TensorFlow與Spark對比:
分布式的級別不同:TensorFlow的Tensor、Variable和Op不是分布式的,分布式執行的是subgraph. Spark的op和變量都是構建在RDD上,RDD本身是分布式的。
異步訓練:TensorFlow支持同步和異步的分布式訓練;Spark原生的API只支持同步訓練
分布式存儲:Spark在底層封裝好了worker和分布式數據之間的關系;TensorFlow需要自行維護。
Parameter Server:TensorFlow支持,Spark暫不支持。
TF分布式部署起來還是比較繁瑣的,需要定義好每個任務的ip:port,手工啟動每個task,不提供一個界面可以對集群進行維護。
參考資料
白話tensorflow分布式部署和開發
理解和實現分布式TensorFlow集群完整教程
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38424.html
摘要:貢獻者飛龍版本最近總是有人問我,把這些資料看完一遍要用多長時間,如果你一本書一本書看的話,的確要用很長時間。為了方便大家,我就把每本書的章節拆開,再按照知識點合并,手動整理了這個知識樹。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 貢獻者:飛龍版...
摘要:本文的目的是聚焦于數據操作能力,講述中比較重要的一些,幫助大家實現各自的業務邏輯。傳入輸入值,指定輸出的基本數據類型。 引言 用TensorFlow做好一個機器學習項目,需要具備多種代碼能力: 工程開發能力:怎么讀取數據、怎么設計與運行Computation Graph、怎么保存與恢復變量、怎么保存統計結果、怎么共享變量、怎么分布式部署 數據操作能力:怎么將原始數據一步步轉化為模型需...
閱讀 2415·2021-11-11 11:01
閱讀 3287·2021-10-11 10:57
閱讀 2644·2021-09-30 09:46
閱讀 3492·2021-07-26 23:38
閱讀 1564·2019-08-29 12:22
閱讀 649·2019-08-29 11:28
閱讀 2352·2019-08-26 14:04
閱讀 3050·2019-08-23 18:34