• <em id="6vhwh"><rt id="6vhwh"></rt></em>

    <style id="6vhwh"></style>

    <style id="6vhwh"></style>
    1. <style id="6vhwh"></style>
        <sub id="6vhwh"><p id="6vhwh"></p></sub>
        <p id="6vhwh"></p>
          1. 国产亚洲欧洲av综合一区二区三区 ,色爱综合另类图片av,亚洲av免费成人在线,久久热在线视频精品视频,成在人线av无码免费,国产精品一区二区久久毛片,亚洲精品成人片在线观看精品字幕 ,久久亚洲精品成人av秋霞

            hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫)

            更新時間:2023-03-01 14:57:16 閱讀: 評論:0

            #頭條創(chuàng)作挑戰(zhàn)賽#

            編寫寫入DM層業(yè)務(wù)代碼

            DM層主要是報表數(shù)據(jù),針對實時業(yè)務(wù)將DM層設(shè)置在Clickhou中,在此業(yè)務(wù)中DM層主要存儲的是通過Flink讀取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的數(shù)據(jù)進行設(shè)置窗口分析,每隔10s設(shè)置滾動窗口統(tǒng)計該窗口內(nèi)訪問商品及商品一級、二級分類分析結(jié)果,實時寫入到Clickhou中。

            一、代碼編寫

            具體代碼參照“ProcessBrowLogInfoToDM.scala”,大體代碼邏輯如下:

            object ProcessBrowLogInfoToDM { def main(args: Array[String]): Unit = { //1.準(zhǔn)備環(huán)境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._ /** * 2.創(chuàng)建 Kafka Connector,連接消費Kafka dwd中數(shù)據(jù) * */ tblEnv.executeSql( """ |create table kafka_dws_ur_login_wide_tbl ( | ur_id string, | product_name string, | first_category_name string, | cond_category_name string, | obtain_points string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC', | 'properties.bootstrap.rvers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offt', --也可以指定 earliest-offt 、latest-offt | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin) /** * 3.實時統(tǒng)計每個用戶最近10s瀏覽的商品次數(shù)和商品一級、二級種類次數(shù),存入到Clickhou */ val dwsTbl:Table = tblEnv.sqlQuery( """ | lect ur_id,product_name,first_category_name,cond_category_name from kafka_dws_ur_login_wide_tbl """.stripMargin) //4.將Row 類型數(shù)據(jù)轉(zhuǎn)換成對象類型操作 val browDS: DataStream[BrowLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl) .map(row => { val ur_id: String = row.getField(0).toString val product_name: String = row.getField(1).toString val first_category_name: String = row.getField(2).toString val cond_category_name: String = row.getField(3).toString BrowLogWideInfo(null, ur_id, null, product_name, null, null, first_category_name, cond_category_name, null) }) val dwsDS: DataStream[ProductVisitInfo] = browDS.keyBy(info => { info.first_category_name + "-" + info.cond_category_name + "-" + info.product_name }) .timeWindow(Time.conds(10)) .process(new ProcessWindowFunction[BrowLogWideInfo, ProductVisitInfo, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[BrowLogWideInfo], out: Collector[ProductVisitInfo]): Unit = { val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString) val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString) val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString) val arr: Array[String] = key.split("-") val firstCatName: String = arr(0) val condCatName: String = arr(1) val productName: String = arr(2) val cnt: Int = elements.toList.size out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, condCatName, productName, cnt)) } }) /** * 5.將以上結(jié)果寫入到Clickhou表 dm_product_visit_info 表中 * create table dm_product_visit_info( * current_dt String, * window_start String, * window_end String, * first_cat String, * cond_cat String, * product String, * product_cnt UInt32 * ) engine = MergeTree() order by current_dt * */ //準(zhǔn)備向ClickHou中插入數(shù)據(jù)的sql val inrtIntoCkSql = "inrt into dm_product_visit_info (current_dt,window_start,window_end,first_cat,cond_cat,product,product_cnt) values (?,?,?,?,?,?,?)" val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouUtil.clickhouSink[ProductVisitInfo](inrtIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] { override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = { pst.tString(1,productVisitInfo.currentDt) pst.tString(2,productVisitInfo.windowStart) pst.tString(3,productVisitInfo.windowEnd) pst.tString(4,productVisitInfo.firstCat) pst.tString(5,productVisitInfo.condCat) pst.tString(6,productVisitInfo.product) pst.tLong(7,productVisitInfo.productCnt) } }) //針對數(shù)據(jù)加入sink dwsDS.addSink(ckSink) env.execute() }}二、創(chuàng)建Clickhou-DM層表

            代碼在執(zhí)行之前需要在Clickhou中創(chuàng)建對應(yīng)的DM層商品瀏覽信息表dm_product_visit_info,clickhou建表語句如下:

            #node1節(jié)點啟動clickhou[root@node1 bin]# rvice clickhou-rver start#node1節(jié)點進入clickhou[root@node1 bin]# clickhou-client -m#node1節(jié)點創(chuàng)建clickhou-DM層表create table dm_product_visit_info( current_dt String, window_start String, window_end String, first_cat String, cond_cat String, product String, product_cnt UInt32) engine = MergeTree() order by current_dt;三、代碼測試

            以上代碼編寫完成后,代碼執(zhí)行測試步驟如下:

            1、將代碼中消費Kafka數(shù)據(jù)改成從頭開始消費

            代碼中Kafka Connector中屬性“scan.startup.mode”設(shè)置為“earliest-offt”,從頭開始消費數(shù)據(jù)。

            這里也可以不設(shè)置從頭開始消費Kafka數(shù)據(jù),而是直接啟動向日志采集接口模擬生產(chǎn)日志代碼“RTMockUrLogData.java”,需要啟動日志采集接口及Flume。

            2、執(zhí)行代碼,查看對應(yīng)結(jié)果

            以上代碼執(zhí)行后在,在Clickhou-DM層中表“dm_product_visit_info”中查看對應(yīng)數(shù)據(jù)結(jié)果如下:

            四、架構(gòu)圖

            本文發(fā)布于:2023-02-28 20:04:00,感謝您對本站的認可!

            本文鏈接:http://m.newhan.cn/zhishi/a/167765383675656.html

            版權(quán)聲明:本站內(nèi)容均來自互聯(lián)網(wǎng),僅供演示用,請勿用于商業(yè)和其他非法用途。如果侵犯了您的權(quán)益請與我們聯(lián)系,我們將在24小時內(nèi)刪除。

            本文word下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).doc

            本文 PDF 下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).pdf

            標(biāo)簽:漫畫   頁面   在線閱讀   入口   官方
            相關(guān)文章
            留言與評論(共有 0 條評論)
               
            驗證碼:
            Copyright ?2019-2022 Comsenz Inc.Powered by ? 實用文體寫作網(wǎng)旗下知識大全大全欄目是一個全百科類寶庫! 優(yōu)秀范文|法律文書|專利查詢|
            主站蜘蛛池模板: 视频精品亚洲一区二区| 人妻少妇久久中文字幕| 2021亚洲爆乳无码专区| 成人又黄又爽又色的视频| 无遮高潮国产免费观看| 国产一区二区午夜福利久久| 久久精品国产亚洲夜色AV网站| 久久免费精品国产72精品| 亚洲最大成人在线播放| 成全免费高清观看在线剧情| 国产成人亚洲日韩欧美| 亚洲高清中文字幕在线看不卡| 99福利一区二区视频| 亚洲人成网站18禁止无码| 2021中文字幕亚洲精品| 国产色悠悠综合在线观看| 爱性久久久久久久久| 免费视频欧美无人区码| 婷婷伊人久久| 欧美村妇激情内射| 国产在线午夜不卡精品影院 | 别揉我奶头~嗯~啊~的视频 | 欧美三级韩国三级日本三斤| 亚洲高清国产成人精品久久 | 欧美综合区| 国产成人无码AV大片大片在线观看| 人人人妻人人澡人人爽欧洲一区 | 日韩精品专区在线影观看| 国产精品毛片一区视频播| 色99久久久久高潮综合影院| 国产亚洲av人片在线播放| 中文字幕精品av一区二区五区| 日本一区不卡高清更新二区| 免费一本色道久久一区| 亚洲人妻系列中文字幕| 国产精品无码av一区二区三区| 国产日韩乱码精品一区二区| 久久青青草原亚洲AV无码麻豆| 99久久无码一区人妻a黑| 呻吟国产av久久一区二区| av无码精品一区二区乱子|