摘要:由于使用的是天河二號,版本是,同樣,所以獲取主題時還不能使用在中才開放對的接口,只能使用的方法。本來做并行化就是希望效率更高,卻在調用代碼,同時進行了很多數據轉換。
在pyspark中調用scala代碼 情境說明 問題
我們這邊是要使用Spark去并行一個自然語言處理的算法,其中使用到了LDA主題模型。由于使用的是天河二號,Spark版本是1.5.1,pyspark同樣,所以獲取主題時還不能使用describeTopics(在spark1.6中才開放對python的接口),只能使用topicsMatrix的方法。
本來湊合用topicsMatrix也行,但我們發現,這一個用來獲取主題模型的函數,居然比Lda的訓練還要慢!無論在我們自己的集群還是在天河二號的分區上,都是這一個情況。觀察topicsMatrix的源代碼,好像也沒有什么復雜操作,只是把數據匯總collect而已:
@Since("1.3.0") override lazy val topicsMatrix: Matrix = { // Collect row-major topics val termTopicCounts: Array[(Int, TopicCounts)] = graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) => (index2term(termIndex), cnts)}.collect() // Convert to Matrix val brzTopics = BDM.zeros[Double](vocabSize, k) termTopicCounts.foreach { case (term, cnts) => var j = 0 while (j < k) { brzTopics(term, j) = cnts(j) j += 1 } } Matrices.fromBreeze(brzTopics) }
由于并不是算法中有一些復雜運算導致較慢,我們自然不希望在程序中有這樣的情況。發現到在Spark1.5.1中,mllib中LdaModel已經實現了describeTopics,只是未在Python中開放,我們自然希望嘗試使用describeTopics看看效果。
describeTopics的源代碼探索已知LDA.train()返回的是LdaModel的實例,于是乎,參考上篇博客,用以下方式去調用:
model = LDA.train(rdd_data, k=num_topics, maxIterations=20) topics = model.call("describeTopics", _py2java(sc, 10))
執行速度特別快,然而返回的結果卻不盡如人意,僅返回了一個長度k的列表,每個元素是一個key為"class",value為"scala.Tuple2"的單元素字典。從結果來看,scala的代碼應該是被成功執行了,然而返回結果卻出了問題。查看callJavaFunc的內容,可以判斷出,是describeTopics的返回結果沒有被_java2py函數正常的轉換。
比對Spark1.5和Spark1.6的代碼,LdaModel.describeTopics函數的內容是一致的,那么問題在哪兒呢?再去查看pyspark的LDA.train()調用的PythonMLLibAPI.trainLdaModel,發現在1.6中返回的不再是LdaModel而是它的子類LdaModelWrapper。查看這個類的方法,發現它重載了describeTopics來方便_java2py進行數據轉換:
private[python] class LDAModelWrapper(model: LDAModel) { def topicsMatrix(): Matrix = model.topicsMatrix def vocabSize(): Int = model.vocabSize def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize) def describeTopics(maxTermsPerTopic: Int): Array[Byte] = { val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) => val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava Array[Any](jTerms, jTermWeights) } SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava) } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) }
找到這里,解決方法就油然而生了。只要我們把這一段scala代碼在python中調用,并將describeTopics的Java對象傳入,不就萬事大吉了嗎?
在pyspark中調用scala代碼也許還有別的方法,不過這里使用的方法也足夠簡單。將.scala文件打包成jar后,啟動spark時加入參數--driver-class-path /path/to/xxx.jar,便可以將你的scala代碼放入Spark運行的虛擬機JVM中,從而讓python代碼在運行中通過反射機制在SparkContext._jvm里動態獲取到你的類與方法:
func = sc._jvm.com.example.YourObject.func打包scala代碼
那么,現在的問題就是如何把scala代碼打包成jar了。scala雖然也是基于JVM運行的語言,與java非常相似,但是其編譯選項中并沒有提供將其打包成jar的參數。這里我們用sbt打包它,sbt的下載與安裝請自行查閱其他教程,這里就不提供了,官方網站。
首先編寫好你的scala代碼,確認沒有bug,并在文件開頭用package關鍵字將其封裝至包中。接著,請手動建立你的項目目錄,并創建如下結構:
在build.sbt中,請至少進行以下設置
//項目名 name := "Project" //項目版本 version := "0.1" //scala版本 scalaVersion := "2.10.5" //jdk版本 javacOptions ++= Seq("-source", "1.7", "-target", "1.7") //主函數 mainClass in Compile := Some("YourClass.func")
在plugins.sbt中,請加上這一句話,告訴sbt需要這個第三方插件,這是用來打包的
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
這些都準備完成后,在terminal里進入你的項目根目錄下,輸入
sbt package
等待打包完成,會有相應提示。
更多的打包選項,以及sbt的更多用法,感興趣可以自行查閱。
回到我們這里的問題,我們希望能在python中對describeTopics的返回值進行轉換,那么我么只需要打包那一個重載的describeTopics就好了,這樣可以避免打包Spark的第三方包。更改一下函數的返回值,并注釋掉調用Spark的SerDe進行序列化的語句,最終的代碼如下:
package com.sysu.sparkhelper import java.util.List import scala.collection.JavaConverters object LdaHelper { def convert(topics: Array[(Array[Int], Array[Double])]): List[Array[Any]] = { val result = topics.map { case (terms, termWeights) => val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava Array[Any](jTerms, jTermWeights) } return JavaConverters.seqAsJavaListConverter(result).asJava // SerDe.dumps(JavaConverters.seqAsJavaListConverter(result).asJava) } }
用sbt打包完成后,使用--driver-class-path添加jar包,在python中相應代碼為:
lda_java_model = model._java_model func = getattr(model._java_model, "describeTopics") result = func(_py2java(sc, 10)) topics = _java2py(sc, sc._jvm.com.sysu.sparkhelper.LdaHelper.convert(result))總結
這算是閱讀源碼的一次應用,可以說還是解決了遇到的問題,同時也加深了對Spark的了解。
本來做并行化就是希望效率更高,pyspark卻在調用scala代碼,同時進行了很多數據轉換。想要更好的使用Spark的話,使用scala去編程應該才是最好的。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/40955.html
摘要:底層淺析簡介是官方提供的接口,同時也是中的一個程序。這里一提,對于大部分機器學習算法,你都會看到模塊與模塊都提供了接口,它們的區別在于模塊接受格式的數據而模塊接受格式的數據。 pyspark底層淺析 pyspark簡介 pyspark是Spark官方提供的API接口,同時pyspark也是Spark中的一個程序。 在terminal中輸入pyspark指令,可以打開python的she...
摘要:使用瀏覽器作為界面,向后臺的服務器發送請求,并顯示結果。本文主要介紹在上安裝流程該文件是用戶登錄時,操作系統定制用戶環境時使用的第一個文件,應用于登錄到系統的每一個用戶。 ipython-nodebook IPython notebook 目前已經成為用 Python 做教學、計算、科研的一個重要工具。 IPython Notebook 使用瀏覽器作為界面,向后臺的 IPython ...
摘要:本文作者本文鏈接安裝說明在安裝之前,需要安裝集群環境,如果沒有可以查看分布式集群的搭建用到的軟件軟件版本下載地址節點安排名稱主節點子節點子節點安裝解壓到安裝目錄修改配置文件配置文件位于目錄下。 本文作者:foochane?本文鏈接:https://foochane.cn/article/2019051904.html 1 安裝說明 在安裝spark之前,需要安裝hadoop集群環境,...
摘要:同時集成了機器學習類庫。基于計算框架,將的分布式計算應用到機器學習領域。提供了一個簡單的聲明方法指定機器學習任務,并且動態地選擇最優的學習算法。宣稱其性能是的多倍。 介紹 spark是分布式并行數據處理框架 與mapreduce的區別: mapreduce通常將中間結果放在hdfs上,spark是基于內存并行大數據框架,中間結果放在內存,對于迭代數據spark效率更高,mapred...
閱讀 1763·2021-10-11 10:59
閱讀 2401·2021-09-30 09:53
閱讀 1765·2021-09-22 15:28
閱讀 2795·2019-08-29 15:29
閱讀 1557·2019-08-29 13:53
閱讀 3207·2019-08-29 12:34
閱讀 2848·2019-08-26 10:16
閱讀 2660·2019-08-23 15:16