国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Flink實戰(八) - Streaming Connectors 編程

beita / 698人閱讀

摘要:默認情況下,當數據元到達時,分段接收器將按當前系統時間拆分,并使用日期時間模式命名存儲區。如果需要,可以使用數據元或元組的屬性來確定目錄。這將調用傳入的數據元并將它們寫入部分文件,由換行符分隔。消費者的消費者被稱為或等。

1 概覽 1.1 預定義的源和接收器

Flink內置了一些基本數據源和接收器,并且始終可用。該預定義的數據源包括文件,目錄和插socket,并從集合和迭代器攝取數據。該預定義的數據接收器支持寫入文件和標準輸入輸出及socket。

1.2 綁定連接器

連接器提供用于與各種第三方系統連接的代碼。目前支持這些系統:

Apache Kafka (source/sink)

Apache Cassandra (sink)

Amazon Kinesis Streams (source/sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache NiFi (source/sink)

Twitter Streaming API (source)

Google PubSub (source/sink)

要在應用程序中使用其中一個連接器,通常需要其他第三方組件,例如數據存儲或消息隊列的服務器。

雖然本節中列出的流連接器是Flink項目的一部分,并且包含在源版本中,但它們不包含在二進制分發版中。
1.3 Apache Bahir中的連接器

Flink的其他流處理連接器正在通過Apache Bahir發布,包括:

Apache ActiveMQ (source/sink)

Apache Flume (sink)

Redis (sink)

Akka (sink)

Netty (source)

1.4 其他連接到Flink的方法 1.4.1 通過異步I / O進行數據渲染

使用連接器不是將數據輸入和輸出Flink的唯一方法。一種常見的模式是在一個Map或多個FlatMap 中查詢外部數據庫或Web服務以渲染主數據流。

Flink提供了一個用于異步I / O的API, 以便更有效,更穩健地進行這種渲染。

1.4.2 可查詢狀態

當Flink應用程序將大量數據推送到外部數據存儲時,這可能會成為I / O瓶頸。如果所涉及的數據具有比寫入更少的讀取,則更好的方法可以是外部應用程序從Flink獲取所需的數據。在可查詢的狀態界面,允許通過Flink被管理的狀態,按需要查詢支持這個。

2 HDFS連接器

此連接器提供一個Sink,可將分區文件寫入任一Hadoop文件系統支持的文件系統 。

要使用此連接器,請將以下依賴項添加到項目中:


請注意,流連接器當前不是二進制發布的一部分
2.1 Bucketing File Sink

可以配置分段行為以及寫入,但我們稍后會介紹。這是可以創建一個默認情況下匯總到按時間拆分的滾動文件的存儲槽的方法

Java

Scala

唯一必需的參數是存儲桶的基本路徑。可以通過指定自定義bucketer,寫入器和批量大小來進一步配置接收器。

默認情況下,當數據元到達時,分段接收器將按當前系統時間拆分,并使用日期時間模式"yyyy-MM-dd--HH"命名存儲區。這種模式傳遞給 DateTimeFormatter使用當前系統時間和JVM的默認時區來形成存儲桶路徑。用戶還可以為bucketer指定時區以格式化存儲桶路徑。每當遇到新日期時,都會創建一個新存儲桶。

例如,如果有一個包含分鐘作為最精細粒度的模式,將每分鐘獲得一個新桶。每個存儲桶本身都是一個包含多個部分文件的目錄:接收器的每個并行實例將創建自己的部件文件,當部件文件變得太大時,接收器也會在其他文件旁邊創建新的部件文件。當存儲桶變為非活動狀態時,將刷新并關閉打開的部件文件。如果存儲桶最近未寫入,則視為非活動狀態。默認情況下,接收器每分鐘檢查一次非活動存儲桶,并關閉任何超過一分鐘未寫入的存儲桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一個BucketingSink。

也可以通過指定自定義bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用數據元或元組的屬性來確定bucket目錄。

默認編寫器是StringWriter。這將調用toString()傳入的數據元并將它們寫入部分文件,由換行符分隔。在a setWriter() 上指定自定義編寫器使用BucketingSink。如果要編寫Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置為使用壓縮。

有兩個配置選項指定何時應關閉零件文件并啟動新零件文件:

通過設置批量大小(默認部件文件大小為384 MB)

通過設置批次滾動時間間隔(默認滾動間隔為Long.MAX_VALUE

當滿足這兩個條件中的任何一個時,將啟動新的部分文件。看如下例子:

Java

Scala

這將創建一個接收器,該接收器將寫入遵循此模式的存儲桶文件:

Java

生成結果

date-time是我們從日期/時間格式獲取的字符串

parallel-task是并行接收器實例的索引

count是由于批處理大小或批處理翻轉間隔而創建的部分文件的運行數

然而這種方式創建了太多小文件,不適合HDFS!僅供娛樂!

3 Apache Kafka連接器 3.1 簡介

此連接器提供對Apache Kafka服務的事件流的訪問。

Flink提供特殊的Kafka連接器,用于從/向Kafka主題讀取和寫入數據。Flink Kafka Consumer集成了Flink的檢查點機制,可提供一次性處理語義。為實現這一目標,Flink并不完全依賴Kafka的消費者群體偏移跟蹤,而是在內部跟蹤和檢查這些偏移。

為用例和環境選擇一個包(maven artifact id)和類名。對于大多數用戶來說,FlinkKafkaConsumer08(部分flink-connector-kafka)是合適的。

然后,導入maven項目中的連接器:

環境配置參考

3.2 ZooKeeper安裝及配置

下載zk

http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.15.1.tar.gz

配置系統環境

修改配置數據存儲路徑

啟動3.3 Kafka部署及測試假設你剛剛開始并且沒有現有的Kafka或ZooKeeper數據

由于Kafka控制臺腳本對于基于Unix和Windows的平臺不同,因此在Windows平臺上使用bin  windows 而不是bin /,并將腳本擴展名更改為.bat。
Step 1:下載代碼

下載

解壓

配置環境變量



配置服務器屬性

修改日志存儲路徑

修改主機名

Step 2: 啟動服務器

Kafka使用ZooKeeper,因此如果還沒有ZooKeeper服務器,則需要先啟動它。

后臺模式啟動

Step 3: 創建一個主題

創建topic

Step 4: 發送一些消息

Kafka附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,并將其作為消息發送到Kafka集群。 默認情況下,每行將作為多帶帶的消息發送。

運行生產者,然后在控制臺中鍵入一些消息以發送到服務器。

啟動生產者

Step 5: 啟動一個消費者

Kafka還有一個命令行使用者,它會將消息轉儲到標準輸出。

分屏,新建消費端

在不同的終端中運行上述每個命令,那么現在應該能夠在生產者終端中鍵入消息并看到它們出現在消費者終端中

所有命令行工具都有其他選項; 運行不帶參數的命令將顯示更詳細地記錄它們的使用信息。

3.4 Kafka 1.0.0+ Connector

從Flink 1.7開始,有一個新的通用Kafka連接器,它不跟蹤特定的Kafka主要版本。 相反,它在Flink發布時跟蹤最新版本的Kafka。

如果您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka連接器。 如果使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的連接器。

兼容性

通過Kafka客戶端API和代理的兼容性保證,通用Kafka連接器與較舊和較新的Kafka代理兼容。 它與版本0.11.0或更高版本兼容,具體取決于所使用的功能。

將Kafka Connector從0.11遷移到通用(V1.10新增)

要執行遷移,請參閱升級作業和Flink版本指南和

在整個過程中使用Flink 1.9或更新版本。

不要同時升級Flink和操作符。

確保您作業中使用的Kafka Consumer和/或Kafka Producer分配了唯一標識符(uid):

使用stop with savepoint功能獲取保存點(例如,使用stop --withSavepoint)CLI命令。

用法

要使用通用Kafka連接器,請為其添加依賴關系:


然后實例化新源(FlinkKafkaConsumer)

Flink Kafka Consumer是一個流數據源,可以從Apache Kafka中提取并行數據流。 使用者可以在多個并行實例中運行,每個實例都將從一個或多個Kafka分區中提取數據。

Flink Kafka Consumer參與了檢查點,并保證在故障期間沒有數據丟失,并且計算處理元素“恰好一次”。(注意:這些保證自然會假設Kafka本身不會丟失任何數據。)

請注意,Flink在內部將偏移量作為其分布式檢查點的一部分進行快照。 承諾給Kafka的抵消只是為了使外部的進展觀與Flink對進展的看法同步。 這樣,監控和其他工作可以了解Flink Kafka消費者在多大程度上消耗了一個主題。

和接收器(FlinkKafkaProducer)。

除了從模塊和類名中刪除特定的Kafka版本之外,API向后兼容Kafka 0.11連接器。

3.5 Kafka消費者

Flink的Kafka消費者被稱為FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供對一個或多個Kafka主題的訪問。

構造函數接受以下參數:

主題名稱/主題名稱列表

DeserializationSchema / KeyedDeserializationSchema用于反序列化來自Kafka的數據

Kafka消費者的屬性。需要以下屬性:

“bootstrap.servers”(以逗號分隔的Kafka經紀人名單)

“zookeeper.connect”(逗號分隔的Zookeeper服務器列表)(僅Kafka 0.8需要)

“group.id”消費者群組的ID


上述程序注意配置ip主機映射

虛擬機hosts

本地機器 hosts

發送消息

運行程序消費消息


Example:

Java

Scala

The DeserializationSchema

Flink Kafka Consumer需要知道如何將Kafka中的二進制數據轉換為Java / Scala對象。

在 DeserializationSchema允許用戶指定這樣的一個架構。T deserialize(byte[] message) 為每個Kafka消息調用該方法,從Kafka傳遞值。

從它開始通常很有幫助AbstractDeserializationSchema,它負責將生成的Java / Scala類型描述為Flink的類型系統。實現vanilla的用戶DeserializationSchema需要自己實現該getProducedType(...)方法。

為了訪問Kafka消息的鍵和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)

為方便起見,Flink提供以下模式:

TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)創建基于Flink的模式TypeInformation。如果Flink編寫和讀取數據,這將非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。

JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)將序列化的JSON轉換為ObjectNode對象,可以使用objectNode.get(“field”)作為(Int / String / ...)()從中訪問字段。KeyValue objectNode包含一個“key”和“value”字段,其中包含所有字段,以及一個可選的“元數據”字段,用于公開此消息的偏移量/分區/主題。

AvroDeserializationSchema它使用靜態提供的模式讀取使用Avro格式序列化的數據。它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))中推斷出模式,也可以GenericRecords 使用手動提供的模式(with AvroDeserializationSchema.forGeneric(...))。此反序列化架構要求序列化記錄不包含嵌入式架構。

還有一個可用的模式版本,可以在Confluent Schema Registry中查找編寫器的模式(用于編寫記錄的 模式)。使用這些反序列化模式記錄將使用從模式注冊表中檢索的模式進行讀取,并轉換為靜態提供的模式(通過 ConfluentRegistryAvroDeserializationSchema.forGeneric(...)或ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。

要使用此反序列化模式,必須添加以下附加依賴項:

當遇到因任何原因無法反序列化的損壞消息時,有兩個選項 - 從deserialize(...)方法中拋出異常將導致作業失敗并重新啟動,或者返回null以允許Flink Kafka使用者以靜默方式跳過損壞的消息。請注意,由于使用者的容錯能力(請參閱下面的部分以獲取更多詳細信息),因此對損壞的消息執行失敗將使消費者嘗試再次反序列化消息。因此,如果反序列化仍然失敗,則消費者將在該損壞的消息上進入不間斷重啟和失敗循環。

3.6 Kafka生產者

Flink的Kafka Producer被稱為FlinkKafkaProducer011(或010 對于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,對于Kafka>=1.0.0的版本來說)。

它允許將記錄流寫入一個或多個Kafka主題。

自應用

Pro

確保啟動端口

Pro端生產消息

消費端接收

Example

Java

Scala

上面的示例演示了創建Flink Kafka Producer以將流寫入單個Kafka目標主題的基本用法。對于更高級的用法,還有其他構造函數變體允許提供以下內容:

提供自定義屬性

生產者允許為內部的KafkaProducer提供自定義屬性配置。

自定義分區程序

將記錄分配給特定分區,可以為FlinkKafkaPartitioner構造函數提供實現。將為流中的每個記錄調用此分區程序,以確定應將記錄發送到的目標主題的確切分區。

高級序列化模式

與消費者類似,生產者還允許使用調用的高級序列化模式KeyedSerializationSchema,該模式允許多帶帶序列化鍵和值。它還允許覆蓋目標主題,以便一個生產者實例可以將數據發送到多個主題。

3.8 Kafka消費者開始位置配置

Flink Kafka Consumer允許配置如何確定Kafka分區的起始位置。

Java

Scala

Flink Kafka Consumer的所有版本都具有上述明確的起始位置配置方法。

setStartFromGroupOffsets(默認行為)

從group.idKafka代理(或Zookeeper for Kafka 0.8)中的消費者組(在消費者屬性中設置)提交的偏移量開始讀取分區。如果找不到分區的偏移量,auto.offset.reset將使用屬性中的設置。

setStartFromEarliest()/ setStartFromLatest()

從最早/最新記錄開始。在這些模式下,Kafka中的承諾偏移將被忽略,不會用作起始位置。

setStartFromTimestamp(long)

從指定的時間戳開始。對于每個分區,時間戳大于或等于指定時間戳的記錄將用作起始位置。如果分區的最新記錄早于時間戳,則只會從最新記錄中讀取分區。在此模式下,Kafka中的已提交偏移將被忽略,不會用作起始位置。

還可以指定消費者應從每個分區開始的確切偏移量:

Java

Scala

上面的示例將使用者配置為從主題的分區0,1和2的指定偏移量開始myTopic。偏移值應該是消費者應為每個分區讀取的下一條記錄。請注意,如果使用者需要讀取在提供的偏移量映射中沒有指定偏移量的分區,則它將回退到setStartFromGroupOffsets()該特定分區的默認組偏移行為(即)。

請注意,當作業從故障中自動恢復或使用保存點手動恢復時,這些起始位置配置方法不會影響起始位置。在恢復時,每個Kafka分區的起始位置由存儲在保存點或檢查點中的偏移量確定。

3.9 Kafka生產者和容錯 Kafka 0.8

在0.9之前,Kafka沒有提供任何機制來保證至少一次或恰好一次的語義。

Kafka 0.9和0.10

啟用Flink的檢查點時,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次傳輸保證。

除了開啟Flink的檢查點,還應該配置setter方法:

setLogFailuresOnly(boolean)

默認為false。啟用此選項將使生產者僅記錄失敗日志而不是捕獲和重新拋出它們。這大體上就是計數已成功的記錄,即使它從未寫入目標Kafka主題。這必須設為false對于確保 至少一次

setFlushOnCheckpoint(boolean)

默認為true。啟用此函數后,Flink的檢查點將在檢查點成功之前等待檢查點時的任何動態記錄被Kafka確認。這可確保檢查點之前的所有記錄都已寫入Kafka。必須開啟,對于確保 至少一次

總之,默認情況下,Kafka生成器對版本0.9和0.10具有至少一次保證,即

setLogFailureOnly設置為false和setFlushOnCheckpoint設置為true。

默認情況下,重試次數設置為“0”。這意味著當setLogFailuresOnly設置為時false,生產者會立即失敗,包括Leader更改。
默認情況下,該值設置為“0”,以避免重試導致目標主題中出現重復消息。對于經常更改代理的大多數生產環境,建議將重試次數設置為更高的值。
Kafka目前沒有生產者事務,因此Flink在Kafka主題里無法保證恰好一次交付
Kafka >= 0.11

啟用Flink的檢查點后,FlinkKafkaProducer011

對于Kafka >= 1.0.0版本是FlinkKafkaProduce

可以提供準確的一次交付保證。

除了啟用Flink的檢查點,還可以通過將適當的語義參數傳遞給FlinkKafkaProducer011,選擇三種不同的算子操作模式

Semantic.NONE

Flink啥都不保證。生成的記錄可能會丟失,也可能會重復。

Semantic.AT_LEAST_ONCE(默認設置)


類似于setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。這可以保證不會丟失任何記錄(盡管它們可以重復)。

Semantic.EXACTLY_ONCE

使用Kafka事務提供恰好一次的語義。每當您使用事務寫入Kafka時,不要忘記為任何從Kafka消費記錄的應用程序設置所需的isolation.level(read_committed 或read_uncommitted- 后者為默認值)。

注意事項

Semantic.EXACTLY_ONCE 模式依賴于在從所述檢查點恢復之后提交在獲取檢查點之前啟動的事務的能力。如果Flink應用程序崩潰和完成重啟之間的時間較長,那么Kafka的事務超時將導致數據丟失(Kafka將自動中止超過超時時間的事務)。考慮到這一點,請根據預期的停機時間適當配置事務超時。

Kafka broker默認 transaction.max.timeout.ms 設置為15分鐘。此屬性不允許為生產者設置大于其值的事務超時。

FlinkKafkaProducer011默認情況下,將_transaction.timeout.msproducer config_中的屬性設置為1小時,因此_transaction.max.timeout.ms_在使用 Semantic.EXACTLY_ONCE 模式之前應該增加 該屬性。

在_read_committed_模式中KafkaConsumer,任何未完成的事務(既不中止也不完成)將阻止來自給定Kafka主題的所有讀取超過任何未完成的事務。換言之,遵循以下事件順序:

用戶事務1開啟并寫記錄

用戶事務2開啟并寫了一些其他記錄

用戶提交事務2

即使事務2已經提交了記錄,在事務1提交或中止之前,消費者也不會看到它們。這有兩個含義:

首先,在Flink應用程序的正常工作期間,用戶可以預期Kafka主題中生成的記錄的可見性會延遲,等于已完成檢查點之間的平均時間。

其次,在Flink應用程序失敗的情況下,讀者將阻止此應用程序編寫的主題,直到應用程序重新啟動或配置的事務超時時間過去為止。此注釋僅適用于有多個代理/應用程序寫入同一Kafka主題的情況。

Semantic.EXACTLY_ONCE 模式為每個FlinkKafkaProducer011實例使用固定大小的KafkaProducers池。每個檢查點使用其中一個生產者。如果并發檢查點的數量超過池大小,FlinkKafkaProducer011 將引發異常并將使整個應用程序失敗。請相應地配置最大池大小和最大并發檢查點數。
Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻礙消費者閱讀Kafka主題的延遲事務,這是必要的。但是,如果Flink應用程序在第一個檢查點之前失敗,則在重新啟動此類應用程序后,系統中沒有關于先前池大小的信息。因此,在第一個檢查點完成之前按比例縮小Flink應用程序是不安全的 _FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR_。
3.10 Kafka消費者及其容錯

啟用Flink的檢查點后,Flink Kafka Consumer將使用主題中的記錄,并以一致的方式定期檢查其所有Kafka偏移以及其他 算子操作的狀態。如果作業失敗,Flink會將流式程序恢復到最新檢查點的狀態,并從存儲在檢查點中的偏移量開始重新使用來自Kafka的記錄。

因此,繪制檢查點的間隔定義了程序在發生故障時最多可以返回多少。

檢查點常用參數 enableCheckpointing

啟用流式傳輸作業的檢查點。 將定期快照流式數據流的分布式狀態。 如果發生故障,流數據流將從最新完成的檢查點重新啟動。

該作業在給定的時間間隔內定期繪制檢查點。 狀態將存儲在配置的狀態后端。

此刻未正確支持檢查點迭代流數據流。 如果“force”參數設置為true,則系統仍將執行作業。

setCheckpointingMode

setCheckpointTimeout

setMaxConcurrentCheckpoints

要使用容錯的Kafka使用者,需要在運行環境中啟用拓撲的檢查點:

Scala

Java

另請注意,如果有足夠的處理插槽可用于重新啟動拓撲,則Flink只能重新啟動拓撲。因此,如果拓撲由于丟失了TaskManager而失敗,那么之后仍然必須有足夠的可用插槽。YARN上的Flink支持自動重啟丟失的YARN容器。

如果未啟用檢查點,Kafka使用者將定期向Zookeeper提交偏移量。

參考

Streaming Connectors

Kafka官方文檔

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75588.html

相關文章

  • 《從0到1學習Flink》—— 如何自定義 Data Source ?

    摘要:從上面自定義的可以看到我們繼承的就是這個類,那么來了解一下一個抽象類,繼承自。該類的子類有三個,兩個是抽象類,在此基礎上提供了更具體的實現,另一個是。 showImg(https://segmentfault.com/img/remote/1460000016978900?w=1920&h=1641); 前言 在 《從0到1學習Flink》—— Data Source 介紹 文章中,我...

    songze 評論0 收藏0
  • Flink實戰(六) - Table API & SQL編程

    摘要:每個在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。在這些中處理的數據類型在相應的編程語言中表示為類。該是為中心的聲明性表,其可被動態地改變的表表示流時。這種抽象在語義和表達方面類似于,但是將程序表示為查詢表達式。 1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,并針對不同的用例。 showImg(ht...

    lifefriend_007 評論0 收藏0
  • Flink實時計算 UFlink】基于gradle開發指南

    摘要:基于開發指南如果基于進行應用開發,需要在文件中加入如下配置注解注意修改的值,確保其符合您的應用。應用開發完成后,可以直接直接運行方法,在本地進行基本的測試。基于gradle開發指南如果基于gradle進行應用開發,需要在build.gradle文件中加入如下配置:buildscript { repositories { jcenter() // this applie...

    Tecode 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<