精选!大数据Flink进阶(六):Flink入门案例
Flink入门案例
需求:读取本地数据文件,统计文件中每个单词出现的次数。
(相关资料图)
一、IDEA Project创建及配置
本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:
1、打开IDEA,创建空项目
2、在IntelliJ IDEA 中安装Scala插件
使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。
3、打开Structure,创建项目新模块
创建Java模块:
继续点击"+",创建Scala模块:
创建好"FlinkScalaCode"模块后,右键该模块添加Scala框架支持,并修改该模块中的"java"src源为"scala":
在"FlinkScalaCode"模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。
org.scala-lang scala-library 2.12.10 org.scala-lang scala-compiler 2.12.10 org.scala-lang scala-reflect 2.12.10 4、Log4j日志配置
为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:
log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n复制
并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:
org.slf4j slf4j-log4j12 1.7.36 org.apache.logging.log4j log4j-to-slf4j 2.17.2 5、分别在两个项目模块中导入Flink Maven依赖
"FlinkJavaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.36 2.17.2 org.apache.flink flink-clients ${flink.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version} "FlinkScalaCode"模块导入Flink Maven依赖如下:
UTF-8 1.8 1.8 1.16.0 1.7.31 2.17.1 2.12.10 2.12 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients ${flink.version} org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} org.scala-lang scala-reflect ${scala.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version} 注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入"flink-clients"依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:
flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。
二、案例数据准备
在项目"MyFlinkCode"中创建"data"目录,在目录中创建"words.txt"文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。
hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink三、案例实现
数据源分为有界和无界之分,有界数据源可以编写批处理程序,无界数据源可以编写流式程序。DataSet API用于批处理,DataStream API用于流式处理。
批处理使用ExecutionEnvironment和DataSet,流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类,DataSet处理的数据是有界的,DataStream处理的数据是无界的,这两个类都是不可变的,一旦创建出来就无法添加或者删除数据元。
1、Flink 批数据处理案例
Java版本WordCount使用Flink Java Dataset api实现WordCount具体代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.读取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分单词FlatMapOperator wordsDS = linesDS.flatMap((String lines, Collector collector) -> { String[] arr = lines.split(" "); for (String word : arr) { collector.collect(word); }}).returns(Types.STRING);//3.将单词转换成Tuple2 KV 类型MapOperator> kvWordsDS = wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 进行分组处理得到最后结果并打印kvWordsDS.groupBy(0).sum(1).print(); Scala版本WordCount
使用Flink Scala Dataset api实现WordCount具体代码如下:
//1.准备环境,注意是Scala中对应的Flink环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//3.读取数据文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.进行 WordCount 统计并打印linesDS.flatMap(line => { line.split(" ")}) .map((_, 1)) .groupBy(0) .sum(1) .print()以上无论是Java api 或者是Scala api 输出结果如下,显示的最终结果是统计好的单词个数。
(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)2、Flink流式数据处理案例
Java版本WordCount使用Flink Java DataStream api实现WordCount具体代码如下:
//1.创建流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件数据DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分单词,设置KV格式数据SingleOutputStreamOperator> kvWordsDS = lines.flatMap((String line, Collector> collector) -> { String[] words = line.split(" "); for (String word : words) { collector.collect(Tuple2.of(word, 1L)); }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组统计获取 WordCount 结果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式计算中需要最后执行execute方法env.execute(); Scala版本WordCount使用Flink Scala DataStream api实现WordCount具体代码如下:
//1.创建环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._//3.读取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.进行wordCount统计ds.flatMap(line=>{line.split(" ")}) .map((_,1)) .keyBy(_._1) .sum(1) .print()//5.最后使用execute 方法触发执行env.execute()以上输出结果开头展示的是处理当前数据的线程,一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。
3、DataStream BATCH模式
下面使用Java代码使用DataStream API 的Batch 模式来处理批WordCount代码,方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置批运行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() { @Override public void flatMap(String lines, Collector> out) throws Exception { String[] words = lines.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1L)); } }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute(); 以上代码运行完成之后结果如下,可以看到结果与批处理结果类似,只是多了对应的处理线程号。
3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)此外,Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:
//BATCH 设置批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 设置流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:
$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar 标签:
-
2022-05-23 16:13:32
上海奉贤等区开展常态化防疫压力测试 有序开放公交、公园、公共服务场所、公共街区商区<
本报上海5月21日电 (记者刘士安、曹玲娟)上海正在奉贤等区开展常态化防疫压力测试。在21日召开的上海市疫情防控工作新闻发布会上,奉
-
2022-05-23 16:13:32
“抗疫 宅家云课堂”,吸引沪上老同志观看50万人次<
由上海市委老干部局主办,上海市老干部大学、市科技助老服务中心承办的“抗疫 宅家云课堂”系列直播讲座自4月12日启动以来,深受老同
-
2022-05-23 16:13:32
“代跑腿”买药、开通绿色通道 丰台为管控区居民提供便捷医疗服务<
“真是太感谢了,解决了我的燃眉之急!”家住假日万恒社区的杨女士对前来送药的居委会工作人员说。 自5月17日6时起,丰台区对青塔街...
-
2022-05-23 16:13:32
上海嘉定:儿童计划免疫接种全部恢复,实行预约制<
在5月22日召开的上海市新冠肺炎疫情防控新闻发布会上,嘉定区副区长王浩介绍,从4月28日开始,嘉定在防范区逐步有序恢复老年人疫苗接种
-
2022-05-23 16:13:32
乡村振兴看新疆 | 种下红樱桃 结出“致富果”<
央广网阿图什5月22日消息(记者 罗成 通讯员 杨林)乡村振兴靠产业,产业发展靠特色。新疆阿图什市阿扎克镇麦依村积极引导农民因地制
-
2023-03-20 21:11:52
精选!大数据Flink进阶(六):Flink入门案例
本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJIDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将
-
2023-03-20 19:02:29
全球热点评!茄组词有哪些_汉字茄怎么组词
欢迎观看本篇文章,小勉来为大家解答以上问题。茄组词有哪些,汉字茄怎么组词很多人还不知道,现在让我们一起来看看吧!1、夹克
-
2023-03-20 16:59:30
喋血孤岛演员表_哪些演员出演了这部剧
解答:1、这部电视剧由刘恺威、端木蕻霏、赵昭、温梦阳、林杰妮、隋俊波等人主演。2、剧情介绍:该剧讲述了在战火纷飞的抗日战
-
2023-03-20 15:00:04
前沿资讯!A股收评:沪指跌0.48%三大运营商跌超9%
A股收评:沪指跌0 48%三大运营商跌超9%:指数午后持续回落,三大运营商均跌超9%。截至收盘,沪指跌0 48%,深成指跌0 27%,创业板指跌0 08%,北证
-
2023-03-20 12:40:36
美国银行接连关闭 美媒:民众将为政府“兜底”措施买单|热头条
在硅谷银行与签名银行相继关闭后,美联储、美国财政部和联邦储蓄保险公司发布联合声明称,这两家银行的所有存款账户都将得到担保,美国联邦储
-
2023-03-20 10:52:50
文化和旅游部公示国家级文化产业示范园区 山东1家上榜
央广网北京3月15日消息根据《文化和旅游部办公厅关于开展国家级文化产业示范园区创建验收工作的通知》(办产业发〔2022〕116号),经评审,拟
-
2023-03-20 08:56:36
越秀进京“撸起袖子加油干”
越秀地产全年销售目标为1320亿元,较上一年的目标增长6%,相比2022年制定的9%的增速有所放缓。今年2月北京的那场土拍中,越秀豪掷59 11亿元摘
-
2023-03-20 04:48:25
冬天脸上起皮是怎么回事_冬天为什么脸上起皮 焦点精选
解答:1、冬天的气候相对寒冷干燥。如果不注意饮食和局部皮肤保护,脸上可能会脱皮。这种情况就需要局部保湿了。2、平时也需要
-
2023-03-19 23:06:05
【时快讯】澄江永和:养殖抗浪鱼 助群众增收
澄江永和:养殖抗浪鱼助群众增收
-
2023-03-19 19:23:51
【快播报】泰国路边新娘国语版(泰国路边女一次多少钱)
1、不是很高,吃饭和住普通旅馆价格100人民币左右。2、旅馆的价格很便宜,只要80元人民币就能住一晚;普通酒店则是需要220元人民币;高级酒店
-
2023-03-19 15:59:21
外国教育史教程(第三版)_吴式颖_世界实时
外国教育史教程(第三版)_吴式颖PDF电子版链接:https: pan baidu com s 1_HlJDbAxSooCK1VG3oX8kQ?pwd=cg5f提取码:cg5f--来自百度网盘超级会员V3的分享
-
2023-03-19 12:18:49
今日讯!北京密云:城乡教师轮岗促衔接
“小学到底是什么样的?小学生是怎么上课的?”带着这些问题,由密云区第九幼儿园轮岗到密云区西田各庄镇西田各庄幼儿园的教师曹红,正带领...
-
2023-03-19 09:10:52
血色修道院在哪里_血色修道院在
1、你在暴风城就麻烦了,最好是有人可以拉你过去,如果自己跑这个就太远了。2、路线1 坐地铁到铁炉堡,绕路拖尸体跑到湿地。3、路线2 找FS开塞
-
2023-03-19 04:05:02
个人独资企业的名称有哪些_个人独资企业名称有哪些|天天微资讯
1、个人独资企业的名称可以叫厂、店、部、中心、工作室等。2、还可以再带上个人喜欢的字号,通过名称查询以后就可以用了。本文
-
2023-03-18 22:50:09
宝宝拉稀是什么原因引起来的_宝宝拉稀是什么原因
1、很多家长不知道宝宝拉肚子后是怎么回事,特别担心宝宝拉肚子以腹泻为主,排便次数会增加。2、我来给你介绍一下宝宝拉肚子的原因。本文到此
-
2023-03-18 19:09:18
观天下!日媒:福岛第一核电站核污染水排海部分设备开始运行
中新社东京3月18日电据日本朝日电视台18日报道,福岛第一核电站用于核污染水排海的部分相关设备于17日开始运行,这是核污染水排海相关设备首次
-
2023-03-18 15:55:08
环球快播:轻度脂肪肝应注意什么吃什么药_轻度脂肪肝应注意什么
1、当你已经诊断出轻度脂肪肝的时候,不要太紧张。只要你合理饮食,适当运动,然后控制好生活节奏,同时控制好基础疾病,如高血
-
2023-03-18 12:02:38
全国两会精神看落实|天津滨海—中关村科技园:强化科创平台建设 助力科技型企业发展壮大
点击图片观看视频天津北方网讯:天津滨海中关村科技园,紧紧围绕全国两会精神,在满足科技型企业产业设施需求的同时,提供产、学、研、用、金
-
2023-03-18 08:59:57
03月18日06时山东淄博疫情数据 阳了以后为什么会腰疼?应该怎么办?
03月18日06时山东淄博疫情数据阳了以后为什么会腰疼?应该怎么办?以下为详情!一、03月18日06时淄博疫情数据概览
-
2023-03-18 05:59:33
热气球原理动画演示_热气球原理
1、非常基本的科学原理:热空气会升到冷空气上方。2、从本质上讲,热空气比冷空气轻,因为单位体积热空气的质量较小。3、1立
-
2023-03-18 01:12:47
塑料软管直径规格表_塑料软管规格尺寸表
1、预应力塑料波纹管主要应用于后张法预应力混凝土结构,构件的成孔。2、本公司生产的ZY塑料采用高密度聚乙烯(HDPE
-
2023-03-17 21:21:14
美国国会将拨5.85亿美元支持三家电池工厂,含宁德时代、国轩高科参与的项目 每日讯息
App3月17日消息,美国国会授权拨款5 85亿美元,以支持密歇根州三家总投资数十亿美元的动力电池工厂建设,其中包括福特汽车与宁德时代的合作项
-
2023-03-17 19:01:04
今天最新消息 西游台青“大话西游”
中新网银川7月11日电(记者杨迪)“在台湾,周星驰的《大话西游》真的很火。”11日,来到镇北堡西部影城的台湾青年朱天
-
2023-03-17 16:41:03
环球速看:结婚微信邀请词_结婚微信邀请语
1、产品型号:小米6系统版本:EMUI9 0软件版本:微信7 0打开手机中的微信2、选择想要邀请的人。2、在对话框中输入
-
2023-03-17 14:57:08
全球热门:教主alan walker简介_live fast alan walker
1、度云下载连接已发(若匿名或者是手机用户就请追问留其他发送方式)请查看私信确认无误请及时采纳最佳答案同求资源请发求助提
-
2023-03-17 11:56:58
环球通讯!海尔埃及生态园举行奠基仪式
据新华社报导,由中国企业海尔智家投资的海尔埃及生态园15日在埃及斋月十日城举行奠基仪式。 海尔埃及生态园由中国建筑集团有限公司承建,
-
2023-03-17 09:51:17
聚焦3.15!离石公安带我们筑盾同行,拒绝“套路”!
为进一步提高消费者的法律意识和维权意识,共同营造健康、安全、和谐的消费环境,3月15日,离石公安分局积极开展“3 15国际消费者权益日”...
-
2023-03-17 08:03:09
当日快讯:美国多家银行向第一共和银行注资300亿美元 焦点播报
央视新闻客户端消息,据美国媒体3月16日报道,包括高盛、摩根士丹利、摩根大通、花旗银行、美国银行、富国银行等多家美国大型银行当天向第一共
-
2023-03-17 04:03:23
焦点热文:科技要闻:桌面应用担保帮助你需要保持最新的Windows 10
互联网在提高人们社会活动质量的同时可能对部分互联网使用者造成伤害。我们要正确认识网络的两面性,用其所长、避其所短,发挥网络对生活的积
-
2023-03-16 23:09:45
全球快看点丨襄垣县气象局发布大雾黄色预警【Ⅲ级/较重】【2023-03-16】
襄垣县气象局发布大雾黄色预警【Ⅲ级 较重】【2023-03-16】
-
精选!大数据Flink进阶(六):Flink入门案例
2023-03-20 21:11:52 -
全球热点评!茄组词有哪些_汉字茄怎么组词
2023-03-20 19:02:29 -
喋血孤岛演员表_哪些演员出演了这部剧
2023-03-20 16:59:30 -
前沿资讯!A股收评:沪指跌0.48%三大运营商跌超9%
2023-03-20 15:00:04 -
美国银行接连关闭 美媒:民众将为政府“兜底”措施买单|热头条
2023-03-20 12:40:36 -
文化和旅游部公示国家级文化产业示范园区 山东1家上榜
2023-03-20 10:52:50 -
越秀进京“撸起袖子加油干”
2023-03-20 08:56:36