積分
本文主要介紹了達(dá)達(dá)集團(tuán)使用基于開(kāi)源的Flink Stream SQL開(kāi)發(fā)的Dada Flink SQL進(jìn)行實(shí)時(shí)計(jì)算任務(wù)SQL化過(guò)程中的實(shí)踐經(jīng)驗(yàn)。
時(shí)間回到2018年,在數(shù)據(jù)平臺(tái)和數(shù)據(jù)團(tuán)隊(duì)的共同努力下,我們已經(jīng)有了完整的離線(xiàn)計(jì)算流程,完善的離線(xiàn)數(shù)倉(cāng)模型,也上線(xiàn)了很多的數(shù)據(jù)產(chǎn)品和大量的數(shù)據(jù)報(bào)表。隨著業(yè)務(wù)的發(fā)展,我們也逐漸面臨著越來(lái)越多的實(shí)時(shí)計(jì)算方面的需求。隨著Flink在國(guó)內(nèi)的逐漸流行,實(shí)時(shí)計(jì)算也越來(lái)越多地進(jìn)入我們的視野。當(dāng)時(shí),F(xiàn)link的SQL功能還不完善,大量數(shù)據(jù)開(kāi)發(fā)需要的功能無(wú)法使用SQL表達(dá)。因此,我們的選擇和很多公司的選擇類(lèi)似,通過(guò)對(duì)Flink的框架和API進(jìn)行封裝,降低我們的數(shù)據(jù)開(kāi)發(fā)人員進(jìn)行實(shí)時(shí)任務(wù)開(kāi)發(fā)的難度。針對(duì)這些需求我們計(jì)劃通過(guò)一些封裝,使得數(shù)據(jù)開(kāi)發(fā)同學(xué)無(wú)需開(kāi)發(fā)Java或者Scala代碼,專(zhuān)注于業(yè)務(wù)邏輯的開(kāi)發(fā)。由于開(kāi)發(fā)資源有限,我們傾向于通過(guò)引進(jìn)一些開(kāi)源的框架并進(jìn)行定制性的開(kāi)發(fā)來(lái)完成這個(gè)任務(wù)。通過(guò)一些調(diào)研,我們鎖定了袋鼠云的Flink Stream SQL(以下簡(jiǎn)稱(chēng)FSL)和Uber的AthenaX。對(duì)比后,F(xiàn)SL的豐富的插件、開(kāi)發(fā)的活躍度和支持的相對(duì)完善對(duì)于我們更有吸引力。因此,我們引進(jìn)了袋鼠云的FSL,并基于FSL開(kāi)發(fā)了達(dá)達(dá)的SQL計(jì)算引擎Dada Flink SQL(以下簡(jiǎn)稱(chēng)DFL),并以此進(jìn)行實(shí)時(shí)計(jì)算任務(wù)的SQL化。
首先介紹一下DFL的架構(gòu)。DFL中的主要組件為launcher、core、source插件、sink插件、Flink Siddhi插件以及side插件,其中Flink Siddhi為我們根據(jù)開(kāi)源的Flink Siddhi接入的基于Siddhi的規(guī)則引擎,后面我們會(huì)有專(zhuān)門(mén)的文章介紹Flink Siddhi相關(guān)的內(nèi)容和我們做的封裝。launcher負(fù)責(zé)加載必要的source/side/sink插件,并將Flink program提交到Flink集群,支持session cluster模式和single job模式。core模塊負(fù)責(zé)解析SQL語(yǔ)句,生成SQLTree,并根據(jù)解析的source、sink、Flink Siddhi和side內(nèi)容加載相應(yīng)的插件,生成必要的組件并注冊(cè)進(jìn)Flink TableEnvironment。之后,根據(jù)SQL是否使用了維表JOIN的功能 ,會(huì)選擇直接調(diào)用TableEnvironment.sqlUpdate()或者進(jìn)行維表JOIN的處理。除維表JOIN之外,根據(jù)我們數(shù)據(jù)開(kāi)發(fā)同學(xué)的需求,我們還加入了INTERVAL JOIN的支持。使用流程表示,DFL的整體流程如下圖所示。
2.1 Parser
DFL使用Parser來(lái)解析SQL語(yǔ)句,解析為相應(yīng)的數(shù)據(jù)結(jié)構(gòu),并放入SqlTree進(jìn)行管理以便后續(xù)使用。Parser定義了良好的接口,易于通過(guò)增加新的實(shí)現(xiàn)類(lèi)來(lái)增加對(duì)新的SQL語(yǔ)法的支持。Parser的接口定義如下:
其中match用于判斷一個(gè)具體的Parser的實(shí)現(xiàn)能否實(shí)現(xiàn)對(duì)給定的SQL語(yǔ)句的解析,verifySyntax為我們新增加的接口功能,用于驗(yàn)證給定SQL的語(yǔ)法是否正確,并將相關(guān)的錯(cuò)誤信息放入errorInfo中供調(diào)用方使用,parserSql實(shí)現(xiàn)具體的SQL語(yǔ)法的解析工作。我們?yōu)镮Parser增加了很多的實(shí)現(xiàn)以實(shí)現(xiàn)新的功能,例如增加對(duì)Flink Siddhi的支持等。
2.2 維表JOIN
DFL中包含兩種維表JOIN的實(shí)現(xiàn)方式:ALL及SIDE方式。ALL方式會(huì)將需要JOIN的數(shù)據(jù)一次性讀取并緩存到Task的內(nèi)存中,并可以設(shè)置定期刷新緩存;SIDE方式則在需要進(jìn)行JOIN時(shí)從相應(yīng)的數(shù)據(jù)源中讀取相應(yīng)的數(shù)據(jù),并根據(jù)設(shè)置決定是否將讀取到的數(shù)據(jù)緩存在內(nèi)存中。ALL和SIDE模式相應(yīng)的抽象類(lèi)的定義分別為AllReqRow和AsyncReqRow,他們都實(shí)現(xiàn)了共同的接口ISideReqRow,ISideReqRow中定義了用于將事實(shí)表的數(shù)據(jù)和維表讀取的數(shù)據(jù)進(jìn)行JOIN的方法Row fillData(Row input, Object sideInput)。AllReqRow和AsyncReqRow的定義分別如下:
可以看到其中使用了模板方法的設(shè)計(jì)模式。
AsyncSideReqRow主要提供了初始化LRU緩存,從LRU緩存中獲取數(shù)據(jù)以及從數(shù)據(jù)源或者LRU緩存中無(wú)法找到需要JOIN的數(shù)據(jù)時(shí)的默認(rèn)處理方法。
開(kāi)發(fā)DFL的過(guò)程中,根據(jù)一些業(yè)務(wù)相關(guān)的需求及簡(jiǎn)化數(shù)據(jù)開(kāi)發(fā)人員使用DFL的需要,我們?cè)谠鶩SL的基礎(chǔ)上進(jìn)行了大量的改進(jìn)和擴(kuò)展的工作,下面介紹一些我們?cè)贒FL上做的工作。
3.1 Flink HA模式下,SESSION模式提交任務(wù)超時(shí)
為了Flink任務(wù)有較好的容錯(cuò)性,我們?yōu)镕link集群配置了基于ZooKeper的HA。出于任務(wù)管理和維護(hù)的需要,我們的一些Flink任務(wù)使用了session模式,在將這些任務(wù)遷移到DFL后,發(fā)現(xiàn)提交任務(wù)時(shí),會(huì)報(bào)超時(shí)的錯(cuò)誤。查閱Flink的官方文檔也沒(méi)有發(fā)現(xiàn)線(xiàn)索。后面經(jīng)過(guò)我們的探索,發(fā)現(xiàn)了在YARN session模式下,配置了HA時(shí),進(jìn)行任務(wù)提交需要指定high-availability.cluster-id。添加了如下代碼后,SESSION模式下,任務(wù)可以正常提交了。
3.2 Kafka支持使用SQL關(guān)鍵字作為JSON的字段名
當(dāng)在Flink中使用了SQL關(guān)鍵字作字段名時(shí),即使將字段名用反引號(hào)包起來(lái),依然會(huì)報(bào)如下的錯(cuò)誤:
這個(gè)是Flink的bug,已經(jīng)在1.10.1中作了修復(fù),詳見(jiàn)這個(gè)issue:https://issues.apache.org/jira/browse/FLINK-16526。我們使用的版本為Flink 1.6.2,無(wú)法使用這個(gè)修復(fù)。我們的做法是支持將Kafka中JSON的字段名和引用這個(gè)JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用該JSON字段,而用于JSON解析的還是原始的JSON字段名。具體來(lái)說(shuō),我們?cè)谠獢?shù)據(jù)系統(tǒng)中,支持為Kafka類(lèi)型的表注冊(cè)一個(gè)可選的sourceName。如果注冊(cè)了sourceName,F(xiàn)link Stream SQL將使用sourceName去JSON中解析對(duì)應(yīng)的字段。
3.3 元數(shù)據(jù)整合
DFL上線(xiàn)后,通過(guò)添加必要的功能,使用純SQL開(kāi)發(fā)已經(jīng)滿(mǎn)足我們的很多實(shí)時(shí)任務(wù)開(kāi)發(fā)的需求。但是在DFL運(yùn)行一段時(shí)間后,我們注意到了管理各種上下游存儲(chǔ)的信息給我們的數(shù)據(jù)開(kāi)發(fā)人員帶來(lái)的困擾。我們線(xiàn)上使用的存儲(chǔ)系統(tǒng)包括了Kafka、HBase、ElasticSearch、Redis和MySQL(之后又引入了ClickHouse)。這些數(shù)據(jù)源基本都是異構(gòu)的,連接及用戶(hù)信息各異,而且在不同的任務(wù)中使用相同的數(shù)據(jù)源,每次都需要使用CREATE TABLE
元數(shù)據(jù)管理系統(tǒng)開(kāi)發(fā)完成后,我們將Flink Stream SQL和元數(shù)據(jù)管理系統(tǒng)進(jìn)行了深度集成。通過(guò)引入U(xiǎn)SE TABLE <> AS <> WITH ()的語(yǔ)法,我們的數(shù)據(jù)開(kāi)發(fā)人員只需要將數(shù)據(jù)源在元數(shù)據(jù)管理系統(tǒng)中進(jìn)行注冊(cè) ,之后在Flink Stream SQL中引用注冊(cè)后的表就無(wú)需再填寫(xiě)任何連接信息,而且如果需要引用所有的字段的話(huà),也無(wú)需再填寫(xiě)字段信息。如果不想要引用所有的子段,有兩種辦法可以做到。第一種方法是在USE TABLE的WITH里面使用columns表達(dá)需要引用的字段,第二種方法是在元數(shù)據(jù)系統(tǒng)里注冊(cè)一張只包含了要引用的字段的表。
3.4 Redis hash/set數(shù)據(jù)類(lèi)型的支持
FSL已經(jīng)內(nèi)置了對(duì)Redis作為sink table和side table的支持,但是FSL只支持Redis的String類(lèi)型的數(shù)據(jù),而我們的場(chǎng)景會(huì)使用到Redis的hash和set類(lèi)型的數(shù)據(jù),因此我們需要添加對(duì)Redis這兩種數(shù)據(jù)類(lèi)型的支持。首先介紹一下將Redis中的數(shù)據(jù)映射到Flink中的表的方法,在我們的Redis的key中包含了兩部分的內(nèi)容(使用":"分隔),兩部分分別為固定的keyPrefix和由一到多個(gè)字段的值使用":"拼接的primaryKey,其中keyPrefix模擬表的概念,也方便Redis中存儲(chǔ)的內(nèi)容的管理。對(duì)String類(lèi)型的數(shù)據(jù),Redis的key會(huì)在上面介紹的key的基礎(chǔ)上拼接上字段名稱(chēng)(使用":"作為分隔符),并以字段的值作為該key對(duì)應(yīng)的value寫(xiě)入Redis中;對(duì)Hash類(lèi)型的數(shù)據(jù),Redis的完整的key就為上面介紹的key,hash的key則由用戶(hù)指定的字段的值使用":"拼接而成,類(lèi)似的,hash的value由用戶(hù)指定的字段的值拼接而成。除了Redis hash和set數(shù)據(jù)類(lèi)型的支持之外,我們還為Redis增加了setnx和hsetnx以及TTL的功能。
3.5 ClickHouse sink的支持
FSL內(nèi)置了對(duì)Kafka、MySQL、Redis、Elasticsearch和HBbase等數(shù)據(jù)源作為目標(biāo)表的支持,但是我們?cè)谑褂玫倪^(guò)程中也遇到了一些新的數(shù)據(jù)源作為目標(biāo)寫(xiě)入端的要求,為此我們開(kāi)發(fā)了新的sink插件來(lái)支持這種需求。我們開(kāi)發(fā)和維護(hù)的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink為例介紹一下我們?cè)谶@方面所做的一些工作。
對(duì)于ClickHouse,我們開(kāi)發(fā)了實(shí)現(xiàn)了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通過(guò)實(shí)現(xiàn)CheckpointedFunction并在snapshotState()方法中將數(shù)據(jù)刷寫(xiě)到ClickHouse來(lái)確保數(shù)據(jù)不會(huì)丟失。為了處理不同的輸入數(shù)據(jù)類(lèi)型,我們提供接口ClickhouseMapper
不同于通常情況下由用戶(hù)提供sink表的schema的方式,我們通過(guò)執(zhí)行DESC
的方式從ClickHouse獲取表的schema。為了處理ClickHouse中的特殊數(shù)據(jù)類(lèi)型,例如nullable(String),Int32等,我們使用正則表達(dá)式提取出實(shí)際的類(lèi)型進(jìn)行寫(xiě)入,相關(guān)的代碼如下。為了寫(xiě)入數(shù)據(jù)的過(guò)程不阻塞正常的數(shù)據(jù)處理流程,我們使用了將數(shù)據(jù)寫(xiě)入任務(wù)放入線(xiàn)程池的方式。同時(shí)為了在Flink任務(wù)失敗的情況下不發(fā)生數(shù)據(jù)丟失的情況,在snapshotState()方法中等待線(xiàn)程池中的任務(wù)完成。
3.6 BINLOG表達(dá)的簡(jiǎn)化
為了處理線(xiàn)上數(shù)據(jù)的更新,我們采用了阿里巴巴開(kāi)源的Canal采集MySQL binlog并發(fā)送到Kafka的方式。由于binlog特殊的數(shù)據(jù)組織形式,處理binlog的數(shù)據(jù)需要做很多繁雜的工作,例如從binlog的columnValues或者updatedValues字段中使用udf取出實(shí)際增加或者更新的字段。由于我們將Flink Stream SQL和元數(shù)據(jù)系統(tǒng)進(jìn)行了對(duì)接,因此我們可以拿到MySQL表的schema信息,從而我們可以提供語(yǔ)法封裝來(lái)幫助數(shù)據(jù)開(kāi)發(fā)人員減少這種重復(fù)性的SQL表達(dá)。為此,我們引入一種新的SQL語(yǔ)法:USE BINLOG TABLE,這種語(yǔ)法的格式如下。
我們會(huì)將這種語(yǔ)法展開(kāi)為如下的內(nèi)容。
在DFL上線(xiàn)后,由于可以使用純SQL進(jìn)行開(kāi)發(fā),符合數(shù)據(jù)開(kāi)發(fā)同學(xué)的開(kāi)發(fā)習(xí)慣,而且我們提供了很多的語(yǔ)法封裝,加上元數(shù)據(jù)管理帶來(lái)的便利,數(shù)據(jù)開(kāi)發(fā)同學(xué)逐步將一些實(shí)時(shí)計(jì)算任務(wù)遷移到了DFL上,這為部門(mén)帶來(lái)了極大的效率提升。截止到目前,DFL已經(jīng)應(yīng)用到了達(dá)達(dá)集團(tuán)的各個(gè)數(shù)據(jù)應(yīng)用系統(tǒng)中,系統(tǒng)中運(yùn)行的實(shí)時(shí)計(jì)算任務(wù)已經(jīng)達(dá)到70多個(gè),涵蓋達(dá)達(dá)快送、京東到家的各個(gè)業(yè)務(wù)及流量模塊,而且實(shí)時(shí)計(jì)算任務(wù)數(shù)量和SQL化占比還在穩(wěn)步增加中。隨著大數(shù)據(jù)部門(mén)的計(jì)算基礎(chǔ)設(shè)施開(kāi)放,現(xiàn)在我們的實(shí)時(shí)計(jì)算能力也在集團(tuán)其它部門(mén)中得到了越來(lái)越廣泛的應(yīng)用。
當(dāng)前Flink的社區(qū)版本已經(jīng)發(fā)展到了1.10,F(xiàn)link Table/SQL本身已經(jīng)支持了DFL提供的多數(shù)功能,出于降低維護(hù)組件復(fù)雜度的考慮,我們計(jì)劃后續(xù)引入Flink 1.10,并逐步推廣Flink 1.10的使用,以期最后將所有的任務(wù)都遷移到最新的Flink版本上。
公司內(nèi)部在逐步推廣私有云的使用,考慮到社區(qū)在Flink on K8s上的進(jìn)展,我們后續(xù)在引入新版本的Flink時(shí),將嘗試在公司的私有云上進(jìn)行部署。
作者簡(jiǎn)介:馬陽(yáng)陽(yáng) 達(dá)達(dá)集團(tuán)數(shù)據(jù)平臺(tái)高級(jí)開(kāi)發(fā)工程師,負(fù)責(zé)達(dá)達(dá)集團(tuán)計(jì)算引擎相關(guān)的維護(hù)和開(kāi)發(fā)工作
義烏漲完廣州漲 通達(dá)兔等快遞全年或增收數(shù)十億!
1598 閱讀又出傷人事件!買(mǎi)A退B、簽收訛詐、押金不退……快遞小哥如何避坑?
1344 閱讀傳網(wǎng)絡(luò)貨運(yùn)“獎(jiǎng)補(bǔ)”全面暫停,誰(shuí)破防了?
1208 閱讀興滿(mǎn)物流華北首個(gè)樞紐落戶(hù)普洛斯?jié)蠄@區(qū),開(kāi)啟零擔(dān)物流新格局
1189 閱讀2025年7月中國(guó)快遞發(fā)展指數(shù)報(bào)告
863 閱讀國(guó)家鐵路集團(tuán)950億成立新藏鐵路公司
834 閱讀中國(guó)郵政開(kāi)通“濟(jì)南=東京”國(guó)際貨郵航線(xiàn)
759 閱讀阿里技術(shù)元老“多隆”隱退,曾入選阿里合伙人
793 閱讀拼多多與順豐香港恢復(fù)合作
734 閱讀京東物流“狼族”系列亮相機(jī)器人大會(huì)
702 閱讀