中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Apache Beam 實(shí)戰(zhàn)指南 | 大數(shù)據(jù)管道 (pipeline) 設(shè)計及實(shí)戰(zhàn)

2019-08-27    來源:raincent

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

作者:張海濤

關(guān)于 Apache Beam 實(shí)戰(zhàn)指南系列文章

隨著大數(shù)據(jù) 2.0 時代悄然到來,大數(shù)據(jù)從簡單的批處理擴(kuò)展到了實(shí)時處理、流處理、交互式查詢和機(jī)器學(xué)習(xí)應(yīng)用。近年來涌現(xiàn)出諸多大數(shù)據(jù)應(yīng)用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發(fā)者經(jīng)常要用到不同的技術(shù)、框架、API、開發(fā)語言和 SDK 來應(yīng)對復(fù)雜應(yīng)用的開發(fā),這大大增加了選擇合適工具和框架的難度,開發(fā)者想要將所有的大數(shù)據(jù)組件熟練運(yùn)用幾乎是一項(xiàng)不可能完成的任務(wù)。

面對這種情況,Google 在 2016 年 2 月宣布將大數(shù)據(jù)流水線產(chǎn)品(Google DataFlow)貢獻(xiàn)給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣布開源 Apache Beam,2017 年 5 月迎來了它的第一個穩(wěn)定版本 2.0.0。在國內(nèi),大部分開發(fā)者對于 Beam 還缺乏了解,社區(qū)中文資料也比較少。

一.概述

其他行業(yè)問咱們 IT 具體干什么的,很多 IT 人員會自嘲自己就是“搬磚”(此處將復(fù)制代碼稱為搬磚)的民工。過了兩天 GitHub 出現(xiàn)自動寫代碼的人工智能,IT 程序員深深嘆了一口氣說道“完了要失業(yè)了,代碼沒得搬了”。其實(shí)從入行 IT 那一刻起,不管我們做前端、服務(wù)端、底層架構(gòu)等任何崗位,其實(shí)我們都是為數(shù)據(jù)服務(wù)的服務(wù)人員(注:不是說從民工轉(zhuǎn)崗到服務(wù)員了):把數(shù)據(jù)從后端搬到前端,把前端數(shù)據(jù)再寫入數(shù)據(jù)庫。盡管編程語言從 C、C++、C#、JAVA、Python 不停變化,為了適應(yīng)時代背景框架也是千變?nèi)f化,我們拼命從“亞馬遜熱帶雨林”一直學(xué)到“地中海”。

然后 Apache Beam 這個一統(tǒng)“地中海”的框架出現(xiàn)了。Apache Beam 不光統(tǒng)一了數(shù)據(jù)源,還統(tǒng)一了流批計算。在這個數(shù)據(jù)傳輸過程中有一條核心的技術(shù)就是管道(Pipeline),不管是 Strom,F(xiàn)link ,Beam 它都是核心。在這條管道中可以對數(shù)據(jù)進(jìn)行過濾、凈化、清洗、合并、分流以及各種實(shí)時計算操作。

本文會詳細(xì)介紹如何設(shè)計 Apache Beam 管道、管道設(shè)計工具介紹、源碼和案例分析,普及和提升大家對 Apache Beam 管道的認(rèn)知。

二.怎樣設(shè)計好自己的管道?

設(shè)計管道注意事項(xiàng)

 

 

圖 2-1 簡單管道

1. 你輸入的數(shù)據(jù)存儲在那里?

首先要確定你要構(gòu)造幾條數(shù)據(jù)源,在 Beam 可以構(gòu)建多條,構(gòu)建之前可以選擇自己的 SDK 的 IO。

2. 你的數(shù)據(jù)類型是什么樣的?

Beam 提供的是鍵值對的數(shù)據(jù)類型,你的數(shù)據(jù)可能是日志文本、格式化設(shè)備事件、數(shù)據(jù)庫的行,所以在 PCollection 就應(yīng)該確定數(shù)據(jù)集的類型。

3. 你想怎么處理數(shù)據(jù)?

對數(shù)據(jù)進(jìn)行轉(zhuǎn)換、過濾處理、窗口計算、SQL 處理等。 在管道中提供了通用的 ParDo 轉(zhuǎn)換類,算子計算以及 BeamSQL 等操作。

4. 你打算把數(shù)據(jù)最后輸出到哪里去?

在管道末尾進(jìn)行 Write 寫入操作,把數(shù)據(jù)最后寫入你自己想存放或最后流向的地方。

管道的幾種玩法

1. 分支管道:多次轉(zhuǎn)換,處理相同的數(shù)據(jù)集

 

 

圖 2-2-1 多次轉(zhuǎn)換處理相同數(shù)據(jù)示意圖

描述:例如上圖 2-1-1 圖所示,從一個數(shù)據(jù)庫的表讀取或轉(zhuǎn)換數(shù)據(jù)集,然后從數(shù)據(jù)集中分別找找以字母“A”開頭的數(shù)據(jù)放入一個分支數(shù)據(jù)集中,如果以字母“B”開頭的數(shù)據(jù)放入另一個分支數(shù)據(jù)集中,最終兩個數(shù)據(jù)集進(jìn)行隔離處理。

數(shù)據(jù)集:

// 為了演示顯示內(nèi)存數(shù)據(jù)集
final List LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");

示例代碼:

PCollection dbRowCollection = ...;// 這個地方可以讀取任何數(shù)據(jù)源。
PCollection aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以 "A" 開頭的數(shù)據(jù)
c.output(c.element());
System.out.append("A 開頭的單詞有:"+c.element()+"\r");
}
}
}));
PCollection bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以 "A" 開頭的數(shù)據(jù)
c.output(c.element());
System.out.append("B 開頭的單詞有:"+c.element()+"\r");
}
}
}));

最終結(jié)果展示:

A 開頭的單詞有:Aggressive
B 開頭的單詞有:Bold
A 開頭的單詞有:Apprehensive
B 開頭的單詞有:Brilliant

原示例代碼地址 : pipelineTest2_1

2. 分支管道:一次轉(zhuǎn)換,輸出多個數(shù)據(jù)集

 

 

圖 2-2-2 一次轉(zhuǎn)換多個輸出示意圖

描述:根據(jù)圖 2-2-1 和圖 2-2-2 圖中可以看出,他們以不同的方式執(zhí)行著相同的操作,圖 2-2-1 中的管道包含兩個轉(zhuǎn)換,用于處理同一輸入中的元素 PCollection。一個轉(zhuǎn)換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA}

另一個轉(zhuǎn)換為

if(以'B'開頭){outputToPCollectionB}

因?yàn)槊總轉(zhuǎn)換讀取整個輸入 PCollection,所以輸入中的每個元素都會 PCollection 被處理兩次。

圖 2-2-2 中的管道以不同的方式執(zhí)行相同的操作 - 只有一個轉(zhuǎn)換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA} else if(以'B'開頭){outputToPCollectionB}

其中輸入中的每個元素都 PCollection 被處理一次。

數(shù)據(jù)集:同 2-1-1 數(shù)據(jù)集

示例代碼:

// 定義兩個 TupleTag,每個輸出一個。
final TupleTag startsWithATag = new TupleTag(){};
final TupleTag startsWithBTag = new TupleTag(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母帶有 "A" 的數(shù)據(jù)集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母帶有 "B" 的數(shù)據(jù)集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

如果每個元素的轉(zhuǎn)換計算非常耗時,則使用其他輸出會更有意義,因?yàn)橐淮涡赃^濾全部數(shù)據(jù),比全部數(shù)據(jù)過濾兩次從性能上和轉(zhuǎn)換上都存在一定程度上提升,數(shù)據(jù)量越大越明顯。

最終結(jié)果展示:

復(fù)制代碼A 開頭的單詞有:ApprehensiveA 開頭的單詞有:AggressiveB 開頭的單詞有:BrilliantB 開頭的單詞有:Bold

原示例代碼地址 : pipelineTest2_2

3. 合并管道:多個數(shù)據(jù)集,合并成一個管道輸出

 

 

圖 2-2-3 多數(shù)據(jù)集合并輸出圖

描述:

上圖 2-2-3 是接圖 2-2-1 的繼續(xù),把帶“A” 的數(shù)據(jù)和帶“B” 字母開頭的數(shù)據(jù)進(jìn)行合并到一個管道。

這個地方注意點(diǎn)是 Flatten 用法必須兩個數(shù)據(jù)的數(shù)據(jù)類型相同。

數(shù)據(jù)集:

// 為了演示顯示內(nèi)存數(shù)據(jù)集
final List LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List LINESb = Arrays.asList(
"Bold",
"Brilliant");

示例代碼:

// 將兩個 PCollections 與 Flatten 合并
PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection mergedCollectionWithFlatten = collectionList
.apply(Flatten.pCollections());
// 繼續(xù)合并新的 PCollection
mergedCollectionWithFlatten.apply(...);

結(jié)果展示:

合并單詞單詞有:
Aggressive
Brilliant
Apprehensive
Bold

原示例代碼地址 : pipelineTest2_3

4. 合并管道:多個數(shù)據(jù)源,鏈接合并一個管道輸出

 

 

圖 2-2-4 多數(shù)據(jù)源合并輸出圖

描述:

你的管道可以從一個或多個源讀取或輸入。如果你的管道從多個源讀取并且這些源中的數(shù)據(jù)相關(guān)聯(lián),則將輸入連接在一起會很有用。在上面的圖 2-2-4 所示的示例中,管道從數(shù)據(jù)庫表中讀取名稱和地址,并從 Kafka 主題中讀取名稱和訂單號。然后管道 CoGroupByKey 用于連接此信息,其中鍵是名稱 ; 結(jié)果 PCollection 包含名稱,地址和訂單的所有組合。

示例代碼:

PCollection> userAddress = pipeline.apply(JdbcIO.>read()...);
PCollection> userOrder = pipeline.apply(KafkaIO.read()...);
final TupleTag addressTag = new TupleTag();
final TupleTag orderTag = new TupleTag();
// 將集合值合并到 CoGbkResult 集合中。
PCollection> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.create());
joinedCollection.apply(...);

管道的設(shè)計工具

對于管道的設(shè)計不光用代碼去實(shí)現(xiàn),也可以用視圖工具,F(xiàn)在存在的有兩種一種是拓藍(lán)公司出品叫 Talend Big Data Studio,另一種就是免費(fèi)開源的視圖設(shè)計工具 kettle-beam 。

 

 

三.怎樣創(chuàng)建你的管道

Apache Beam 程序從頭到尾就是處理數(shù)據(jù)的管道。本小節(jié)使用 Apache Beam SDK 中的類構(gòu)建管道,一個完整的 Apache Beam 管道構(gòu)建流程如下:

首先創(chuàng)建一個 Pipeline 對象。

不管是數(shù)據(jù)做任何操作,如“ 讀取”或“ 創(chuàng)建”及轉(zhuǎn)換都要為管道創(chuàng)建 PCollection 一個或多個的數(shù) 據(jù)集(PCollection****)。

在 Apache Beam 的管道中你可以對數(shù)據(jù)集 PCollection 做任何操作,例如轉(zhuǎn)換數(shù)據(jù)格式,過濾,分組,分析或以其他方式處理數(shù)據(jù)中的每一個元素。每個轉(zhuǎn)換都會創(chuàng)建一個新輸出數(shù)據(jù)集 PCollection,當(dāng)然你可以在處理完成之前進(jìn)行做任何的轉(zhuǎn)換處理。

把你認(rèn)為最終處理完成的數(shù)據(jù)集寫或以其他方式輸出最終的存儲地方。

最后運(yùn)行管道。

創(chuàng)建管道對象

每一個 Apache Beam 程序都會從創(chuàng)建管道(Pipeline)對象開始。

在 Apache Beam SDK,每一個管道都是一個獨(dú)立的實(shí)體,管道的數(shù)據(jù)集也都封裝著它的數(shù)據(jù)和對應(yīng)的數(shù)據(jù)類型(在 Apache Beam 中有對應(yīng)的數(shù)據(jù)轉(zhuǎn)換類型包)。最后把數(shù)據(jù)進(jìn)行用于各種轉(zhuǎn)換操作。

在創(chuàng)建的管道的時候需要設(shè)置管道選項(xiàng) PipelineOptions,有兩種創(chuàng)建方式第一種是無參數(shù)和一種有參數(shù)的。具體兩種有什么不同呢? 無參數(shù)的可以在程序中指定相應(yīng)的管道選項(xiàng)參數(shù),如顯示設(shè)置執(zhí)行大數(shù)據(jù)引擎參數(shù)。有參數(shù)的就可以在提交 Apache Beam jar 程序的時候進(jìn)行用 Shell 腳本的方式后期設(shè)置管道對應(yīng)的參數(shù)。

具體示例如下:

無參數(shù)

// 首先定義管道的選項(xiàng)
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 顯示設(shè)置執(zhí)行大數(shù)據(jù)引擎
// 創(chuàng)建管道實(shí)體對象
Pipeline p = Pipeline.create(options);

有參數(shù)

PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

提交設(shè)置參數(shù)的格式如下:

--

標(biāo)簽: 大數(shù)據(jù)管道 機(jī)器學(xué)習(xí)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:Caffe作者賈揚(yáng)清:AI,從大數(shù)據(jù)演進(jìn)到高性能計算

下一篇:Apache Flink 1.9 重磅發(fā)布:正式合并阿里內(nèi)部版本Blink重要功能