首页 >> 大全

借助Docker学习大数据:Flink

2023-12-18 大全 26 作者:考证青年

借助学习大数据:Flink

注意:本博文基于WSL2 & ,如无法达到实验效果,请注意环境差异。如果你使用的是、虚拟机等方式,注意下文提到的。

WSL2安装:

本文注重实验,原理可参考:

一、Flink集群搭建 1.1 镜像下载

首先下载Flink镜像

docker pull flink	# 获取镜像
docker images		# 查看下载的镜像

1.2 集群搭建

我们可以直接运行 or

方式分别如下:

docker run --name flink_jobmanager -d -t flink jobmanager	# JobManager
docker run --name flink_taskmanager -d -t flink taskmanager	# TaskManager

我们这里直接通过 的方式运行一个集群:

介绍

首先新建一个文件夹用于存放yml文件。这里我在WSL2的home路径新建一个 -flink 文件夹,并在该文件夹中新建 -.yml 文件,内容如下:

version: "2.1"
services:jobmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: ${FLINK_DOCKER_IMAGE_NAME:-flink}expose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

比如我的文件如下:

借助Docker学习大数据:Flink_借助Docker学习大数据:Flink_

version: "2.1"
services:jobmanager:image: flinkexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flinkexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

创建完成,直接在该目录运行如下命令启动 即可:

docker-compose up -d

使用浏览器打开 :8081 即可:

当然我们也可以直接在 看到集群的信息,发现是 1 1 ,如果想要扩展可以通过如下命令:

docker-compose scale taskmanager=<N>

二、Java 编程 2.1 Maven

    <properties><flink.version>1.11.1flink.version>properties><dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.12artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.12artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-coreartifactId><version>${flink.version}version>dependency>dependencies>

2.2 Java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/*** @author Creek*/
@SuppressWarnings("serial")
public class WordCount {public static void main(String[] args) throws Exception {// the host and the port to connect tofinal String hostname;final int port;try {final ParameterTool params = ParameterTool.fromArgs(args);// 注意:WSL2 的 hostname 不是localhost,可以在WSL2中输入ifconfig获得
//            hostname = params.has("hostname") ? params.get("hostname") : "localhost";hostname = params.has("hostname") ? params.get("hostname") : "172.31.61.151";port = params.getInt("port");} catch (Exception e) {System.err.println("No port specified. Please run 'SocketWindowWordCount " +"--hostname  --port ', where hostname (localhost by default) " +"and port is the address of the text server");System.err.println("To start a simple text server, run 'netcat -l ' and " +"type the input text into the command line");return;}// get the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStream<String> text = env.socketTextStream(hostname, port, "\n");// parse the data, group it, window it, and aggregate the countsDataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) {for (String word : value.split("\\s")) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) {return new WordWithCount(a.word, a.count + b.count);}});// print the results with a single thread, rather than in parallelwindowCounts.print().setParallelism(1);env.execute("Socket Window WordCount");}// ------------------------------------------------------------------------/*** Data type for words with count.*/public static class WordWithCount {public String word;public long count;public WordWithCount() {}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return word + " : " + count;}}
}

这里我们配置args如下:

2.3 监听9000端口

在WSL2中

netcat -l 9000	# nc -l 9000

运行main函数,在终端随便输入几个单词:

效果如下:

需要指出的是:现在你看到终端有结果,并不是借助flink运行的,如果你把运行的flink的集群关掉,仍然可以看到结果。

下面我们把代码打包,放入集群中运行查看效果。

三、打Jar包,提交集群

使用IDEA打包,找到jar包,右键打开文件位置

在flink中提交

填写主类信息、参数信息,

发现正在运行:

参考资料:

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了