客戶不需要實時計算的結(jié)果,認為實時結(jié)果對他們沒有參考意義
客戶要求計算一個月(或更長時間)內(nèi)歷史數(shù)據(jù)的基準值,認為一個月(或更長時間)歷史數(shù)據(jù)得到的基準值才能更加準確的評估和預測未來的趨勢
批處理在大數(shù)據(jù)世界有著悠久的歷史。批處理主要操作大容量靜態(tài)數(shù)據(jù)集,并在計算過程完成后返回結(jié)果。
批處理模式中使用的數(shù)據(jù)集通常符合下列特征:
有界:批處理數(shù)據(jù)集代表數(shù)據(jù)的有限集合
持久:數(shù)據(jù)通常始終存儲在某種類型的持久存儲位置中
大量:海量數(shù)據(jù)集
批處理非常適合需要訪問全套記錄才能完成的計算工作。例如在計算總數(shù)和平均數(shù)時,必須將數(shù)據(jù)集作為一個整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計算進行過程中數(shù)據(jù)維持自己的狀態(tài)。
需要處理大量數(shù)據(jù)的任務(wù)通常最適合用批處理操作進行處理。無論直接從持久存儲設(shè)備處理數(shù)據(jù)集,或首先將數(shù)據(jù)集載入內(nèi)存,批處理系統(tǒng)在設(shè)計過程中就充分考慮了數(shù)據(jù)的量,可提供充足的處理資源。由于批處理在應對大量持久數(shù)據(jù)方面的表現(xiàn)極為出色,因此經(jīng)常被用于對歷史數(shù)據(jù)進行分析。
精確一次(exactly once)是指數(shù)據(jù)處理沒有數(shù)據(jù)丟失和重復處理的現(xiàn)象。
流處理的數(shù)據(jù)來源一般是消息隊列,是無界的,數(shù)據(jù)是一條一條獲取,在加載數(shù)據(jù)時可能會出現(xiàn)網(wǎng)絡(luò)連接等問題,所以流處理需要解決數(shù)據(jù)丟失和重復處理的問題,實現(xiàn)精確一次(exactly once)的語義相對復雜,目前storm流框架目前不支持(exactly once),spark為了支持(exactly once)引入預寫日志(AWL)并且offset由Spark自身管理 ,flink為了支持(exactly once)引入快照(snapshot)機制, 雖然流處理能夠解決數(shù)據(jù)丟失和重復計算問題,但需要引入各種機制,而這增加了系統(tǒng)消耗的資源。
批處理的數(shù)據(jù)源是靜態(tài)塊,比如文件,hdfs文件,批處理一次性加載一批數(shù)據(jù),基本不會出現(xiàn)數(shù)據(jù)丟失和重復計算的情況。
如果說流處理引入各種機制增加資源消耗可以解決數(shù)據(jù)丟失和重復處理問題,那么對于亂序數(shù)據(jù)流則存在忽略數(shù)據(jù)的可能。
流處理數(shù)據(jù)沒有邊界,需要窗口(window)的概念,根據(jù)窗口來匯總計算。窗口(window)類型有很多種, 滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session window)等,窗口(window)中需要定義時間,流處理中存在事件時間(event time)和處理時間(process time)。對于亂序的數(shù)據(jù),為此又引入了水印(watermark)機制。具體概念讀者自行查閱。
水印(watermark)有一個允許延時(allow lateness)的參數(shù), 窗口(window)接收到水印(watermark)后,再等待一段時間才會關(guān)閉窗口,如果這段時間有些數(shù)據(jù)依然沒有發(fā)送過來,那就只能忽略它們了。允許延時(allow lateness)參數(shù)設(shè)置的大,系統(tǒng)占用的資源就多,而且允許延時(allow lateness)的參數(shù)不能設(shè)置無限大,因此如果數(shù)據(jù)源異常亂序,流處理的窗口就等不到延時數(shù)據(jù)過來就進行匯總計算,導致延時數(shù)據(jù)未處理。
批處理數(shù)據(jù)有界,所有的數(shù)據(jù)全部都會加載,不用考慮數(shù)據(jù)源的順序,不會出現(xiàn)忽略數(shù)據(jù)的情況,也不需要窗口(window) ,時間,水印等機制。
以下是核心代碼:
flink執(zhí)行環(huán)境:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
從kafka中獲取DataSet數(shù)據(jù)源,DataSet表示批處理的數(shù)據(jù) (流處理是DataStream):
▼▼▼
DataSet
String, String>> recordsDataSetDataSet = env.createInput(KafkaInputFormat
.buildKafkaInputFormat().setBootstrapServers(KafkaServers)
.setGroupId(xx).setTopic(sourceTopic).finish());
繼承GenericInputFormat類實現(xiàn)自定義獲取kafka數(shù)據(jù)源 KafkaInputFormat:
▼▼▼
public class KafkaInputFormat extends GenericInputFormat
String, String>> {
@Override
public void open(GenericInputSplit split) throws IOException {
consumer = new KafkaConsumer<String,String>(props);
initPartionMap();
}
//獲取kafka topic每個分區(qū)的偏移量,用做kafka消費結(jié)束的標識
void initPartionMap(){
CollectionpartitionInfos = consumer.partitionsFor(topic);
Listtp =new ArrayList ();
partitionInfos.forEach(partitionInfo -> {
tp.add(new TopicPartition(topic,partitionInfo.partition()));
consumer.assign(tp);
consumer.seekToEnd(tp);
partionOffsetMap.put(partitionInfo.partition(),consumer.position(new TopicPartition(topic, partitionInfo.partition())));
partionBooleanMap.put(partitionInfo.partition(), false);
//獲取參數(shù)值后返回最初
consumer.seekToBeginning(tp);
});
}
//消費kafka是否結(jié)束
@Override
public boolean reachedEnd() throws IOException {
return !partionBooleanMap.containsValue(false);
}
@Override
public ConsumerRecords<String, String> nextRecord(ConsumerRecords<String, String> reuse) {
//從kafka中獲取一批數(shù)據(jù)
final ConsumerRecords<String, String> records consumer.poll(Duration.ofMillis(pollTime));
for (ConsumerRecord<String, String> record : records) {
Integer partion=record.partition();
Long offset= record.offset();
//表示已有分區(qū)已經(jīng)消費完
if(offset+1==partionOffsetMap.get(partion)) {
partionBooleanMap.put(partion, true);
}
}
return records;
}
流處理和批處理都有各自的優(yōu)缺點和應用場景,應該根據(jù)項目需求選擇合適的。
更多精彩干貨分享
點擊下方名片關(guān)注
IT那活兒
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/129897.html
摘要:再如通過處理流數(shù)據(jù)生成簡單的報告,如五分鐘的窗口聚合數(shù)據(jù)平均值。復雜的事情還有在流數(shù)據(jù)中進行數(shù)據(jù)多維度關(guān)聯(lián)聚合塞選,從而找到復雜事件中的根因。因為各種需求,也就造就了現(xiàn)在不斷出現(xiàn)實時計算框架,而下文我們將重磅介紹我們推薦的實時計算框架。 前言 先廣而告之,本文摘自本人《大數(shù)據(jù)重磅炸彈——實時計算框架 Flink》課程第二篇,內(nèi)容首發(fā)自我的知識星球,后面持續(xù)在星球里更新,這里做個預告,今...
摘要:實際上,本身就預留了與外部元數(shù)據(jù)對接的能力,分別提供了和這兩個抽象。對接外部數(shù)據(jù)源搞清楚了注冊庫表的過程,給我們帶來這樣一個思路如果外部元數(shù)據(jù)創(chuàng)建的表也能被轉(zhuǎn)換成可識別的,那么就能被無縫地注冊到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔任 OPPO 大數(shù)據(jù)平臺研發(fā)負責人,也是 Apache Flink contrib...
摘要:基于流處理機制實現(xiàn)批流融合相對基于批處理機制實現(xiàn)批流融合的思想更自然,更合理,也更有優(yōu)勢,因此阿里巴巴在基于支持大量核心實時計算場景的同時,也在不斷改進的架構(gòu),使其朝著真正批流融合的統(tǒng)一計算引擎方向前進。 阿里妹導讀:2018年12月下旬,由阿里巴巴集團主辦的Flink Forward China在北京國家會議中心舉行。Flink Forward是由Apache軟件基金會授權(quán)的全球范圍...
摘要:另外,將機制發(fā)揚光大,對有著非常好的支持。系統(tǒng)也注意到并討論了和的問題。總結(jié)本文分享了四本相關(guān)的書籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計,實現(xiàn),故障恢復,彈性擴展等各方面。 前言 之前也分享了不少自己的文章,但是對于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點 Flink 相關(guān)的資料(國外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...
摘要:第二個問題就是說業(yè)務(wù)團隊之間沒有擴大管理,預算和審核是無頭緒的。支持一些高優(yōu)先級的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個左右的作業(yè)沒有遷移完。 作者:張光輝 本文將為大家展示字節(jié)跳動公司怎么把Storm從Jstorm遷移到Flink的整個過程以及后續(xù)的計劃。你可以借此了解字節(jié)跳動公司引入Flink的背景以及Flink集群的構(gòu)建過程。字節(jié)跳動公司是如何兼容以...
閱讀 1346·2023-01-11 13:20
閱讀 1684·2023-01-11 13:20
閱讀 1132·2023-01-11 13:20
閱讀 1858·2023-01-11 13:20
閱讀 4100·2023-01-11 13:20
閱讀 2704·2023-01-11 13:20
閱讀 1385·2023-01-11 13:20
閱讀 3597·2023-01-11 13:20