国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

Spark SQL學(xué)習(xí)筆記

qieangel2013 / 2935人閱讀

摘要:是中處理結(jié)構(gòu)化數(shù)據(jù)的模塊。可以從很多數(shù)據(jù)源加載數(shù)據(jù)并構(gòu)造得到,如結(jié)構(gòu)化數(shù)據(jù)文件,中的表,外部數(shù)據(jù)庫(kù),或者已有的。使用反射機(jī)制,推導(dǎo)包含指定類(lèi)型對(duì)象的。這一功能應(yīng)該優(yōu)先于使用。隨后,將會(huì)掃描必要的列,并自動(dòng)調(diào)整壓縮比例,以減少內(nèi)存占用和壓力。

Spark SQL是Spark中處理結(jié)構(gòu)化數(shù)據(jù)的模塊。與基礎(chǔ)的Spark RDD API不同,Spark SQL的接口提供了更多關(guān)于數(shù)據(jù)的結(jié)構(gòu)信息和計(jì)算任務(wù)的運(yùn)行時(shí)信息。Spark SQL如今有了三種不同的API:SQL語(yǔ)句、DataFrame API和最新的Dataset API。
Spark SQL的一種用法是直接執(zhí)行SQL查詢(xún)語(yǔ)句,你可使用最基本的SQL語(yǔ)法,也可以選擇HiveQL語(yǔ)法。Spark SQL可以從已有的Hive中讀取數(shù)據(jù)。

DataFrame是一種分布式數(shù)據(jù)集合,每一條數(shù)據(jù)都由幾個(gè)命名字段組成。概念上來(lái)說(shuō),她和關(guān)系型數(shù)據(jù)庫(kù)的表 或者 R和Python中的data frame等價(jià),DataFrame可以從很多數(shù)據(jù)源(sources)加載數(shù)據(jù)并構(gòu)造得到,如:結(jié)構(gòu)化數(shù)據(jù)文件,Hive中的表,外部數(shù)據(jù)庫(kù),或者已有的RDD。

Dataset是Spark-1.6新增的一種API。Dataset想要把RDD的優(yōu)勢(shì)(強(qiáng)類(lèi)型,可以使用lambda表達(dá)式函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的優(yōu)勢(shì)結(jié)合到一起。Dataset可以由JVM對(duì)象構(gòu)建(constructed )得到,而后Dataset上可以使用各種transformation算子(map,flatMap,filter 等)。

入口:SQLContext與SparkSession

對(duì)于2.0版本以前,Spark SQL所有的功能入口都是SQLContext 類(lèi),及其子類(lèi)。

val sc: SparkContext // 假設(shè)已經(jīng)有一個(gè) SparkContext 對(duì)象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隱式轉(zhuǎn)換操作
import sqlContext.implicits._

對(duì)于2.0版本以后,入口變成了SparkSession,使用SparkSession.builder()來(lái)構(gòu)建

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

Spark2.0引入SparkSession的目的是內(nèi)建支持Hive的一些特性,包括使用HiveQL查詢(xún),訪問(wèn)Hive UDFs,從Hive表中讀取數(shù)據(jù)等,使用這些你不需要已存在的Hive配置。而在此之前,你需要引入HiveContext的依賴(lài),并使用HiveContext來(lái)支持這些特性。

DataFrame

DataFrame可以從很多數(shù)據(jù)源(sources)加載數(shù)據(jù)并構(gòu)造得到,如:結(jié)構(gòu)化數(shù)據(jù)文件,Hive中的表,外部數(shù)據(jù)庫(kù),或者已有的RDD。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Spark2.0之后,DataFrame僅是Dataset of Rows(對(duì)于java和Scala是這樣).DataFrame提供了結(jié)構(gòu)化數(shù)據(jù)的領(lǐng)域?qū)S谜Z(yǔ)言支持.

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的操作方法列表,請(qǐng)查看Dataset的api
Dataset還支持各種字符串,日期,數(shù)學(xué)等函數(shù),列表見(jiàn)這里

編程方式執(zhí)行SQL查詢(xún)
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View - 前面創(chuàng)建的TempView是與SparkSession相關(guān)的,隨著session結(jié)束而銷(xiāo)毀,如果你想跨多個(gè)Session共享,你需要使用Global Temporary View.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Dataset

Dataset API和RDD類(lèi)似,不過(guò)Dataset不使用Java序列化或者Kryo,而是使用專(zhuān)用的編碼器(Encoder )來(lái)序列化對(duì)象和跨網(wǎng)絡(luò)傳輸通信。如果這個(gè)編碼器和標(biāo)準(zhǔn)序列化都能把對(duì)象轉(zhuǎn)字節(jié),那么編碼器就可以根據(jù)代碼動(dòng)態(tài)生成,并使用一種特殊數(shù)據(jù)格式,這種格式下的對(duì)象不需要反序列化回來(lái),就能允許Spark進(jìn)行操作,如過(guò)濾、排序、哈希等。

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder personEncoder = Encoders.bean(Person.class);
Dataset javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder integerEncoder = Encoders.INT();
Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset transformedDS = primitiveDS.map(new MapFunction() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
和RDD互操作

Spark SQL有兩種方法將RDD轉(zhuǎn)為DataFrame。

使用反射機(jī)制,推導(dǎo)包含指定類(lèi)型對(duì)象RDD的schema。這種基于反射機(jī)制的方法使代碼更簡(jiǎn)潔,而且如果你事先知道數(shù)據(jù)schema,推薦使用這種方式;

編程方式構(gòu)建一個(gè)schema,然后應(yīng)用到指定RDD上。這種方式更啰嗦,但如果你事先不知道數(shù)據(jù)有哪些字段,或者數(shù)據(jù)schema是運(yùn)行時(shí)讀取進(jìn)來(lái)的,那么你很可能需要用這種方式。

利用反射推導(dǎo)schema

Spark SQL支持自動(dòng)轉(zhuǎn)換一個(gè)JavaBean的RDD為DataFrame. 目前,SparkSQL不支持包含Map域的JavaBean轉(zhuǎn)換。你可以創(chuàng)建一個(gè)實(shí)現(xiàn)了Serializable接口的JavaBean.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(new Function() {
    @Override
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");
      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder stringEncoder = Encoders.STRING();
Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset teenagerNamesByFieldDF = teenagersDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getAs("name");
  }
}, stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
編程方式定義Schema

你可能需要按以下三個(gè)步驟,以編程方式的創(chuàng)建一個(gè)DataFrame:

從已有的RDD創(chuàng)建一個(gè)包含Row對(duì)象的RDD

用StructType創(chuàng)建一個(gè)schema,和步驟1中創(chuàng)建的RDD的結(jié)構(gòu)相匹配

把得到的schema應(yīng)用于包含Row對(duì)象的RDD,調(diào)用這個(gè)方法來(lái)實(shí)現(xiàn)這一步:SparkSession.createDataFrame

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD rowRDD = peopleRDD.map(new Function() {
  @Override
  public Row call(String record) throws Exception {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
  }
});

// Apply the schema to the RDD
Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset namesDS = results.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
Data Sources數(shù)據(jù)源

Spark SQL支持基于DataFrame操作一系列不同的數(shù)據(jù)源。DataFrame既可以當(dāng)成一個(gè)普通RDD來(lái)操作,也可以將其注冊(cè)成一個(gè)臨時(shí)表來(lái)查詢(xún)。把DataFrame注冊(cè)為table之后,你就可以基于這個(gè)table執(zhí)行SQL語(yǔ)句了。本節(jié)將描述加載和保存數(shù)據(jù)的一些通用方法,包含了不同的Spark數(shù)據(jù)源
在最簡(jiǎn)單的情況下,所有操作都會(huì)以默認(rèn)類(lèi)型數(shù)據(jù)源來(lái)加載數(shù)據(jù)(默認(rèn)是Parquet,除非修改了spark.sql.sources.default 配置)。

Dataset usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

你也可以手動(dòng)指定數(shù)據(jù)源,并設(shè)置一些額外的選項(xiàng)參數(shù)。數(shù)據(jù)源可由其全名指定(如,org.apache.spark.sql.parquet),而對(duì)于內(nèi)建支持的數(shù)據(jù)源,可以使用簡(jiǎn)寫(xiě)名(json, parquet, jdbc)。任意類(lèi)型數(shù)據(jù)源創(chuàng)建的DataFrame都可以用下面這種語(yǔ)法轉(zhuǎn)成其他類(lèi)型數(shù)據(jù)格式。

Dataset peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

直接對(duì)文件使用SQL,Spark SQL還支持直接對(duì)文件使用SQL查詢(xún),不需要用read方法把文件加載進(jìn)來(lái)。

Dataset sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
保存模式

Save操作有一個(gè)可選參數(shù)SaveMode,用這個(gè)參數(shù)可以指定如何處理數(shù)據(jù)已經(jīng)存在的情況。很重要的一點(diǎn)是,這些保存模式都沒(méi)有加鎖,所以其操作也不是原子性的。另外,如果使用Overwrite模式,實(shí)際操作是,先刪除數(shù)據(jù),再寫(xiě)新數(shù)據(jù)。

SaveMode.ErrorIfExists (default) "error" (default) (默認(rèn)模式)從DataFrame向數(shù)據(jù)源保存數(shù)據(jù)時(shí),如果數(shù)據(jù)已經(jīng)存在,則拋異常。

SaveMode.Append "append" 如果數(shù)據(jù)或表已經(jīng)存在,則將DataFrame的數(shù)據(jù)追加到已有數(shù)據(jù)的尾部。

SaveMode.Overwrite "overwrite" 如果數(shù)據(jù)或表已經(jīng)存在,則用DataFrame數(shù)據(jù)覆蓋之。

SaveMode.Ignore "ignore" 如果數(shù)據(jù)已經(jīng)存在,那就放棄保存DataFrame數(shù)據(jù)。這和SQL里CREATE TABLE IF NOT EXISTS有點(diǎn)類(lèi)似。

保存到持久化表

DataFrame可以使用saveAsTable方法將內(nèi)容持久化到Hive的metastore表中.默認(rèn)情況下,saveAsTable會(huì)創(chuàng)建一個(gè)”managed table“,也就是說(shuō)這個(gè)表數(shù)據(jù)的位置是由metastore控制的。同樣,如果刪除表,其數(shù)據(jù)也會(huì)同步刪除。

Parquet文件

Parquet 是一種流行的列式存儲(chǔ)格式。Spark SQL提供對(duì)Parquet文件的讀寫(xiě)支持,而且Parquet文件能夠自動(dòng)保存原始數(shù)據(jù)的schema。寫(xiě)Parquet文件的時(shí)候,所有的字段都會(huì)自動(dòng)轉(zhuǎn)成nullable,以便向后兼容。

編程方式加載數(shù)據(jù)

Dataset peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset namesDS = namesDF.map(new MapFunction() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

其余關(guān)鍵特性:請(qǐng)看官方文檔

分區(qū)發(fā)現(xiàn)

Schema合并

Hive metastore Parquet table轉(zhuǎn)換

刷新元數(shù)據(jù)

配置

JSON數(shù)據(jù)集

Spark SQL在加載JSON數(shù)據(jù)的時(shí)候,可以自動(dòng)推導(dǎo)其schema并返回 Dataset。用SparkSession.read().json()讀取一個(gè)包含String的RDD或者JSON文件,即可實(shí)現(xiàn)這一轉(zhuǎn)換。

注意,通常所說(shuō)的json文件只是包含一些json數(shù)據(jù)的文件,而不是我們所需要的JSON格式文件。JSON格式文件必須每一行是一個(gè)獨(dú)立、完整的的JSON對(duì)象。因此,一個(gè)常規(guī)的多行json文件經(jīng)常會(huì)加載失敗。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List jsonData = Arrays.asList(
        "{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}");
JavaRDD anotherPeopleRDD =
        new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Hive表

Spark SQL支持從Hive中讀寫(xiě)數(shù)據(jù).然而,Hive依賴(lài)項(xiàng)太多,所以沒(méi)有把Hive包含在默認(rèn)的Spark發(fā)布包里。要支持Hive,需要把相關(guān)的jar包放到classpath中(注意是所有節(jié)點(diǎn)的).
配置文件hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) 放在conf/.
首先你需要初始化SparkSession,但是如果你沒(méi)有存在的Hive部署,仍然可以得到Hive支持。

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = "spark-warehouse";
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH "examples/src/main/resources/kv1.txt" INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
Dataset stringsDS = sqlDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Key: " + row.get(0) + ", Value: " + row.get(1);
  }
}, Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

和不同版本的Hive Metastore交互: 略,請(qǐng)看官方文檔

用JDBC連接其他數(shù)據(jù)庫(kù)

Spark SQL也可以用JDBC訪問(wèn)其他數(shù)據(jù)庫(kù)。這一功能應(yīng)該優(yōu)先于使用JdbcRDD。因?yàn)樗祷匾粋€(gè)DataFrame,而DataFrame在Spark SQL中操作更簡(jiǎn)單,且更容易和來(lái)自其他數(shù)據(jù)源的數(shù)據(jù)進(jìn)行交互關(guān)聯(lián)。
首先,你需要在spark classpath中包含對(duì)應(yīng)數(shù)據(jù)庫(kù)的JDBC driver,下面這行包括了用于訪問(wèn)postgres的數(shù)據(jù)庫(kù)driver

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar 

遠(yuǎn)程數(shù)據(jù)庫(kù)的表可以通過(guò)Data Sources API,用DataFrame或者SparkSQL 臨時(shí)表來(lái)裝載。以下是選項(xiàng)列表:

url : 普通jdbc url

dbtable 需要讀取的JDBC表。注意,任何可以填在SQL的where子句中的東西,都可以填在這里。(既可以填完整的表名,也可填括號(hào)括起來(lái)的子查詢(xún)語(yǔ)句)

driver JDBC driver的類(lèi)名。這個(gè)類(lèi)必須在master和worker節(jié)點(diǎn)上都可用,這樣各個(gè)節(jié)點(diǎn)才能將driver注冊(cè)到JDBC的子系統(tǒng)中。

fetchsize JDBC fetch size,決定每次獲取多少行數(shù)據(jù)。默認(rèn)為 1000.

isolationLevel 可選有NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE,默認(rèn)為READ_UNCOMMITTED.

truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.

createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

疑難解答
JDBC driver class必須在所有client session或者executor上,對(duì)java的原生classloader可見(jiàn)。這是因?yàn)镴ava的DriverManager在打開(kāi)一個(gè)連接之前,會(huì)做安全檢查,并忽略所有對(duì)原聲classloader不可見(jiàn)的driver。最簡(jiǎn)單的一種方法,就是在所有worker節(jié)點(diǎn)上修改compute_classpath.sh,并包含你所需的driver jar包。一些數(shù)據(jù)庫(kù),如H2,會(huì)把所有的名字轉(zhuǎn)大寫(xiě)。對(duì)于這些數(shù)據(jù)庫(kù),在Spark SQL中必須也使用大寫(xiě)。

性能調(diào)整

內(nèi)存緩存
Spark SQL可以通過(guò)調(diào)用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存儲(chǔ)格式緩存到內(nèi)存中。隨后,Spark SQL將會(huì)掃描必要的列,并自動(dòng)調(diào)整壓縮比例,以減少內(nèi)存占用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來(lái)刪除內(nèi)存中的table。

你還可以使用SQLContext.setConf 或在SQL語(yǔ)句中運(yùn)行SET key=value命令,來(lái)配置內(nèi)存中的緩存。

spark.sql.inMemoryColumnarStorage.compressed true 如果設(shè)置為true,Spark SQL將會(huì)根據(jù)數(shù)據(jù)統(tǒng)計(jì)信息,自動(dòng)為每一列選擇多帶帶的壓縮編碼方式。

spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批量的大小。增大批量大小可以提高內(nèi)存利用率和壓縮率,但同時(shí)也會(huì)帶來(lái)OOM(Out Of Memory)的風(fēng)險(xiǎn)。
其他配置選項(xiàng)

以下選項(xiàng)同樣也可以用來(lái)給查詢(xún)?nèi)蝿?wù)調(diào)性能。不過(guò)這些選項(xiàng)在未來(lái)可能被放棄,因?yàn)閟park將支持越來(lái)越多的自動(dòng)優(yōu)化。

spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.

spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操作時(shí),能夠作為廣播變量的最大table的大小。設(shè)置為-1,表示禁用廣播。注意,目前的元數(shù)據(jù)統(tǒng)計(jì)僅支持Hive metastore中的表,并且需要運(yùn)行這個(gè)命令:ANALYSE TABLE COMPUTE STATISTICS noscan

spark.sql.shuffle.partitions 200 配置數(shù)據(jù)混洗(shuffle)時(shí)(join或者聚合操作),使用的分區(qū)數(shù)。

分布式SQL引擎

Spark SQL可以作為JDBC/ODBC或者命令行工具的分布式查詢(xún)引擎。在這種模式下,終端用戶(hù)或應(yīng)用程序,無(wú)需寫(xiě)任何代碼,就可以直接在Spark SQL中運(yùn)行SQL查詢(xún)。
略。

使用Spark SQL命令行工具

Spark SQL CLI是一個(gè)很方便的工具,它可以用local mode運(yùn)行hive metastore service,并且在命令行中執(zhí)行輸入的查詢(xún)。注意Spark SQL CLI目前還不支持和Thrift JDBC server通信。

用如下命令,在spark目錄下啟動(dòng)一個(gè)Spark SQL CLI

./bin/spark-sql

Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設(shè)置。你可以用這個(gè)命令查看完整的選項(xiàng)列表:./bin/spark-sql –help

聚合

內(nèi)建的DataFrame函數(shù)提供了如count(), countDistinct(), avg(), max(), min()等常用的聚合操作,用戶(hù)也可以自定義一些聚合函數(shù)。
Untyped User-Defined Aggregate Functions

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  }
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Type-Safe User-Defined Aggregate Functions

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn averageSalary = myAverage.toColumn().name("average_salary");
Dataset result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
參考:

http://spark.apache.org/docs/...
http://ifeve.com/apache-spark/

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/67290.html

相關(guān)文章

  • Spark 』1. spark 簡(jiǎn)介

    摘要:原文鏈接簡(jiǎn)介寫(xiě)在前面本系列是綜合了自己在學(xué)習(xí)過(guò)程中的理解記錄對(duì)參考文章中的一些理解個(gè)人實(shí)踐過(guò)程中的一些心得而來(lái)。其次,本系列是基于目前最新的系列開(kāi)始的,目前的更新速度很快,記錄一下版本好還是必要的。 原文鏈接:『 Spark 』1. spark 簡(jiǎn)介 寫(xiě)在前面 本系列是綜合了自己在學(xué)習(xí)spark過(guò)程中的理解記錄 + 對(duì)參考文章中的一些理解 + 個(gè)人實(shí)踐spark過(guò)程中的一些心得而來(lái)。寫(xiě)...

    G9YH 評(píng)論0 收藏0
  • 數(shù)據(jù)庫(kù)

    摘要:編輯大咖說(shuō)閱讀字?jǐn)?shù)用時(shí)分鐘內(nèi)容摘要對(duì)于真正企業(yè)級(jí)應(yīng)用,需要分布式數(shù)據(jù)庫(kù)具備什么樣的能力相比等分布式數(shù)據(jù)庫(kù),他們條最佳性能優(yōu)化性能優(yōu)化索引與優(yōu)化關(guān)于索引與優(yōu)化的基礎(chǔ)知識(shí)匯總。 mysql 數(shù)據(jù)庫(kù)開(kāi)發(fā)常見(jiàn)問(wèn)題及優(yōu)化 這篇文章從庫(kù)表設(shè)計(jì),慢 SQL 問(wèn)題和誤操作、程序 bug 時(shí)怎么辦這三個(gè)問(wèn)題展開(kāi)。 一個(gè)小時(shí)學(xué)會(huì) MySQL 數(shù)據(jù)庫(kù) 看到了一篇適合新手的 MySQL 入門(mén)教程,希望對(duì)想學(xué) ...

    mengbo 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<