摘要:底層淺析簡介是官方提供的接口,同時也是中的一個程序。這里一提,對于大部分機器學習算法,你都會看到模塊與模塊都提供了接口,它們的區別在于模塊接受格式的數據而模塊接受格式的數據。
pyspark底層淺析 pyspark簡介
pyspark是Spark官方提供的API接口,同時pyspark也是Spark中的一個程序。
在terminal中輸入pyspark指令,可以打開python的shell,同時其中默認初始化了SparkConf和SparkContext.
在編寫Spark應用的.py文件時,可以通過import pyspark引入該模塊,并通過SparkConf對Spark的啟動參數進行設置。不過,如果你僅完成了Spark的安裝,直接用python指令運行py文件并不能檢索到pyspark模塊。你可以通過pip等包管理工具安裝該模塊,也可以直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的作業。
pyspark program這里指的是spark中的bin/pyspark,github地址 。
實際上pyspark只不過解析了命令行中的參數,并進行了python方面的設置,然后調用spark-submit
exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
在較新一些的版本如Spark2.2中,已經不支持用pyspark運行py腳本文件,一切spark作業都應該使用spark-submit提交。
pyspark moduleSpark是用scala編寫的框架,不過考慮到主要是機器學習的應用場景,Spark官方提供了可以用python的API。但是,一方面,python的API是不全的,即不是所有的scala的函數都可以用pyspark調用到,雖然新的API也在隨著版本迭代不斷開放;另一方面,pyspark模塊,對于很多復雜算法,是通過反射機制調用的Spark中JVM里正在運行的scala編寫的類、方法。所以,如果你將頻繁應用spark于業務或研究,建議學習直接使用scala語言編寫程序,而不是python。
這篇博客并不會講述如何去使用pyspark來編寫python的spark應用。各類API以及模塊如何使用,你完全可以前往官方文檔查看。這里的鏈接是最新版pyspark的文檔,如果你的機器上的spark不是最新版,請去找對應版本的pyspark文檔。因為正如我上面所說,不同版本的pyspark逐步開放了新的API并有對舊API進行改進,你在最新版本看到的類、函數,不一定能在舊版本使用。這里一提,對于大部分機器學習算法,你都會看到ml模塊與mllib模塊都提供了接口,它們的區別在于ml模塊接受DataFrame格式的數據而mllib模塊接受RDD格式的數據。
關于pyspark底層,這里主要探索兩個地方。一個是其初始化時的工作,一個是其對JVM中scala代碼的調用
SparkContextSparkContext類在pyspark/context.py中,在python代碼里通過初試化該類的實例來完成Spark的啟動與初始化。這個類的__init__方法中執行了下面幾行代碼
self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise
first_spark_call和CallSite方法都是用來獲取JAVA虛擬機中的堆棧,它們在pyspark/traceback_util.py中。
之后調用了類函數_ensure_initialized函數,對Spark的Java的gate_way和jvm進行設置。
最后調用了類中的_do_init_函數,從函數就可以看出是對內部類成員SparkConf的實例_conf函數進行設置,判斷各參數值是否為None,非空的話就進行設置,并讀取一些本地的python環境參數,啟動Spark。
以mllib庫為例,主要邏輯都在pyspark/mllib/common.py中。你去查看mllib模塊中機器學習算法的類與函數,你會發現基本都是使用self.call或者callMLlibFunc,將函數名與參數傳入。
各類模型的Model類都繼承自common.JavaModelWrapper,這個類代碼很短:
class JavaModelWrapper(object): """ Wrapper for the model in JVM """ def __init__(self, java_model): self._sc = SparkContext._active_spark_context self._java_model = java_model def __del__(self): self._sc._gateway.detach(self._java_model) def call(self, name, *a): """Call method of java_model""" return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
_java_model是來自Java或Scala的類的實例,在調用對應的訓練算法時由對應的scala代碼在末尾將這些類初始化并返回,其關鍵的類方法call,同callMLLibFunc方法一樣,都是調用了callJavaFunc的方法。對于調用某一類的方法,是運用python的getattr函數,將類實例與方法名傳入,使用反射機制獲取函數;而對于調用一些不屬于類的方法,即使用callMLLibFunc時,是傳入的PythonMLLibAPI類的實例以及方法名,來獲取函數:
def callMLlibFunc(name, *args): """ Call API in PythonMLLibAPI """ sc = SparkContext.getOrCreate() api = getattr(sc._jvm.PythonMLLibAPI(), name) return callJavaFunc(sc, api, *args)
最終callJavaFunc做的也很簡單,將python的參數*a,使用_py2java方法轉換為java的數據類型,并執行函數,再將結果使用_java2py方法轉換為python的數據類型返回:
def callJavaFunc(sc, func, *args): """ Call Java Function """ args = [_py2java(sc, a) for a in args] return _java2py(sc, func(*args))
這里的_java2py,對很多數據格式的支持不是很好,所以當你嘗試用底層的call方法調用一些pyspark尚未支持但scala中已經有的函數時,可能在scala部分可以執行,但是python的返回結果卻不盡如人意。
ml模塊的調用機制與mllib的機制有些許的不同,但本質上都還是去調用在Spark的JVM中scala代碼的class。
總結本篇博客其實說的非常簡單,pyspark即使是不涉及具體算法的部分,也還有很多內容尚未討論。這里僅是對pyspark產生一個初步的認識,同時簡單分析了一下底層對scala的調用過程。
你興許會有這樣的疑問--“去看這些源代碼有什么用呢?好像就算知道這些,實際使用時不還是用一下API就好了嗎?”。
實際上,看源代碼首先的就是滿足一下好奇心,對Spark有一個更充分的了解;其次關于具體用途,我舉個例子,很多情況你使用的集群可能不是最新版本的,因為復雜的配置導致一般而言也不可能有一個新版本就更新一次,這時你想用新版本的API怎么辦?看了這篇博客想必你也會有一些“大膽的想法”。后一篇博客會舉例說明我在實際工作中相關的一個問題,以及如何利用這些源碼去解決的。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/40956.html
摘要:大數據除了體積和速度外,數據的多樣性和準確性也是大數據的一大特點。這些也被稱為大數據的特征。介紹是一個解決大數據問題的分布式可伸縮的框架。介紹計算的模型最早出現在谷歌的一篇研究論文中。相關鏈接介紹是一個通用的分布式編程框架。 本文作者:foochane?本文鏈接:https://foochane.cn/article/2019060601.html 1 大數據簡介 大數據是這個時代最...
摘要:篇分布計算提高效率的庫及庫函數,比如的庫就有一大堆函數,本質上和的分布式計算的底層思想是一致的。篇特別適用于搭,比如的用于和在用的,其實根本上都是用了的腳本特性,串聯起來。的種常見操作增刪找值相當于執行了這個命令然后可以用函數來, 持續更新。--------------------C++篇------------------------ 分布計算提高效率的庫及庫函數,比如FB的foll...
摘要:篇分布計算提高效率的庫及庫函數,比如的庫就有一大堆函數,本質上和的分布式計算的底層思想是一致的。篇特別適用于搭,比如的用于和在用的,其實根本上都是用了的腳本特性,串聯起來。的種常見操作增刪找值相當于執行了這個命令然后可以用函數來, 持續更新。--------------------C++篇------------------------ 分布計算提高效率的庫及庫函數,比如FB的foll...
摘要:由于使用的是天河二號,版本是,同樣,所以獲取主題時還不能使用在中才開放對的接口,只能使用的方法。本來做并行化就是希望效率更高,卻在調用代碼,同時進行了很多數據轉換。 在pyspark中調用scala代碼 情境說明 問題 我們這邊是要使用Spark去并行一個自然語言處理的算法,其中使用到了LDA主題模型。由于使用的是天河二號,Spark版本是1.5.1,pyspark同樣,所以獲取主題時...
摘要:如何改變智能城市物聯網來源愿碼內容編輯愿碼連接每個程序員的故事網站愿碼愿景打造全學科系統免費課程,助力小白用戶初級工程師成本免費系統學習低成本進階,幫助一線資深工程師成長并利用自身優勢創造睡后收入。 AI如何改變智能城市物聯網? showImg(https://segmentfault.com/img/remote/1460000018768732); 來源 | 愿碼(ChainDe...
閱讀 1349·2021-09-28 09:43
閱讀 4115·2021-09-04 16:41
閱讀 1917·2019-08-30 15:44
閱讀 3728·2019-08-30 15:43
閱讀 775·2019-08-30 14:21
閱讀 2037·2019-08-30 11:00
閱讀 3319·2019-08-29 16:20
閱讀 1923·2019-08-29 14:21