Flink从入门到真香(Flink环境部署-单机)

技术文章 1年前 (2020) 完美者
1,945 0

标签:jar   pre   lang   处理   rm -rf   jps   point   解压   back   

一、准备工作

安装java

yum install java-11-openjdk -y
[root@localhost opt]# java -version
openjdk version "11.0.8" 2020-07-14 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.8+10-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode, sharing)

安装devel(为了安装jps命令)

yum install java-1.8.0-openjdk-devel.x86_64 -y

二、local模式
1、上传安装包然后解压到指定目录,并且修改所属用户和所属组
cd /opt/
wget https://apache.claz.org/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz

tar zxvf flink-1.11.2-bin-scala_2.11.tgz

mv flink-1.11.2 flink

chown -R root:root flink

2、准备一个测试数据集(只是测试用,内容随意)
在/root/words.txt中写入
a b c d
a
f
e d e w 2

3、到flink目录下执行shell,启动交互式窗口测试
/opt/flink/bin/start-scala-shell.sh local
Batch - Use the ‘benv‘ and ‘btenv‘ variable # 批处理环境入口

* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")
*
* val batchTable = btenv.fromDataSet(dataSet)
* btenv.registerTable("tableName", batchTable)
* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
HINT: You can use print() on a DataSet to print the contents or collect()
a sql query result back to the shell.

Streaming - Use the ‘senv‘ and ‘stenv‘ variable # 流处理环境入口

* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
*
* val streamTable = stenv.fromDataStream(dataStream, ‘num)
* val resultTable = streamTable.select(‘num).where(‘num % 2 === 1 )
* resultTable.toAppendStream[Row].print()
* senv.execute("My streaming program")
HINT: You can only print a DataStream to the shell in local mode.

执行效果:
scala> benv.readTextFile("/root/words.txt").flatMap(.split(" ")).map((,1)).groupBy(0).sum(1).print()
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider (file:/opt/flink/lib/flink-dist_2.11-1.11.2.jar) to method java.lang.ref.Reference.waitForReferenceProcessing()
WARNING: Please consider reporting this to the maintainers of org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
(2,1)
(a,2)
(b,1)
(c,1)
(d,2)
(e,2)
(f,1)
(w,1)

三、单机集群模式

1、启动服务
[root@localhost opt]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.

可以看到进程已经启动
[root@localhost ~]# jps
8016 Jps
26851 FlinkShell
7652 StandaloneSessionClusterEntrypoint #job manager
7959 TaskManagerRunner
这时候可以打开http://10.0.83.71:8081 访问flink Dashboard了
2、提交一个测试任务

输入源还是用words.txt作为样例,指定输出

/opt/flink/bin/flink run /opt/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out2

如果失败需要删除之前的运行信息(一般出现在重复安装flink或者多个模式切换)
rm -rf /tmp/.yarn-properties-root

3、停止集群
[root@localhost bin]# /opt/flink/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 11494) on host localhost.localdomain.
Stopping standalonesession daemon (pid: 11187) on host localhost.localdomain.

Flink从入门到真香(Flink环境部署-单机)

标签:jar   pre   lang   处理   rm -rf   jps   point   解压   back   

原文地址:https://blog.51cto.com/mapengfei/2546922

版权声明:完美者 发表于 2020-11-06 2:27:05。
转载请注明:Flink从入门到真香(Flink环境部署-单机) | 完美导航

暂无评论

暂无评论...