Flink的这些事(二)——Flink开发环境搭建

#IEDA开发环境

###1、安装java环境
参考上一篇文章Flink的这些事(一)——Flink部署

###2、安装maven
参考博客Maven安装与配置

###3、配置IDEA
参考博客如何使用IntelliJ IDEA 配置Maven

###4、pom文件设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>flink</groupId>
<artifactId>flink-dev</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.6</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

###5、代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* Author: qincf
* Date: 2018/11/02
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在目标主机1.1.1.1机器上执行nc -l 9000
*/
public class StreamingWindowWordCount {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
//计算数据
DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//获取可视化JSON
System.out.println(env.getExecutionPlan());
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");



}

/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}

}

###6、测试步骤
首先在1.1.1.1机器上使用nc命令模拟数据发送

1
nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序
在主机上输入字符

1
2
3
4
5
6
7
[root@data01]# nc -l 9000
a
a
b
c
d
d

此时运行程序后,IDEA中会打印处结果

1
2
3
4
5
6
7
8
9
10
11
12
E:\tools\Java\bin\java.exe "-javaagent:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\lib\idea_rt.jar=61830:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath E:\tools\Java\jre\lib\charsets.jar;E:\tools\Java\jre\lib\deploy.jar;E:\tools\Java\jre\lib\ext\access-bridge-64.jar;E:\tools\Java\jre\lib\ext\cldrdata.jar;E:\tools\Java\jre\lib\ext\dnsns.jar;E:\tools\Java\jre\lib\ext\jaccess.jar;E:\tools\Java\jre\lib\ext\jfxrt.jar;E:\tools\Java\jre\lib\ext\localedata.jar;E:\tools\Java\jre\lib\ext\nashorn.jar;E:\tools\Java\jre\lib\ext\sunec.jar;E:\tools\Java\jre\lib\ext\sunjce_provider.jar;E:\tools\Java\jre\lib\ext\sunmscapi.jar;E:\tools\Java\jre\lib\ext\sunpkcs11.jar;E:\tools\Java\jre\lib\ext\zipfs.jar;E:\tools\Java\jre\lib\javaws.jar;E:\tools\Java\jre\lib\jce.jar;E:\tools\Java\jre\lib\jfr.jar;E:\tools\Java\jre\lib\jfxswt.jar;E:\tools\Java\jre\lib\jsse.jar;E:\tools\Java\jre\lib\management-agent.jar;E:\tools\Java\jre\lib\plugin.jar;E:\tools\Java\jre\lib\resources.jar;E:\tools\Java\jre\lib\rt.jar;E:\code\flink\target\classes;E:\tools\Maven-Repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;E:\tools\Maven-Repository\org\apache\flink\flink-java\1.6.1\flink-java-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-core\1.6.1\flink-core-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-annotations\1.6.1\flink-annotations-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-metrics-core\1.6.1\flink-metrics-core-1.6.1.jar;E:\tools\Maven-Repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\tools\Maven-Repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\tools\Maven-Repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\tools\Maven-Repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;E:\tools\Maven-Repository\org\tukaani\xz\1.0\xz-1.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-asm\5.0.4-4.0\flink-shaded-asm-5.0.4-4.0.jar;E:\tools\Maven-Repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;E:\tools\Maven-Repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\tools\Maven-Repository\org\apache\flink\force-shading\1.6.1\force-shading-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-java_2.11\1.6.1\flink-streaming-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-runtime_2.11\1.6.1\flink-runtime_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.6.1\flink-queryable-state-client-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-hadoop-fs\1.6.1\flink-hadoop-fs-1.6.1.jar;E:\tools\Maven-Repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-4.0\flink-shaded-netty-4.1.24.Final-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-jackson\2.7.9-4.0\flink-shaded-jackson-2.7.9-4.0.jar;E:\tools\Maven-Repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\config\1.3.0\config-1.3.0.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;E:\tools\Maven-Repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;E:\tools\Maven-Repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\tools\Maven-Repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;E:\tools\Maven-Repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;E:\tools\Maven-Repository\com\twitter\chill-java\0.7.4\chill-java-0.7.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-guava\18.0-4.0\flink-shaded-guava-18.0-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-scala_2.11\1.6.1\flink-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-scala_2.11\1.6.1\flink-streaming-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-table_2.11\1.6.1\flink-table_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-clients_2.11\1.6.1\flink-clients_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-optimizer_2.11\1.6.1\flink-optimizer_2.11-1.6.1.jar;E:\tools\Maven-Repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.6.1\flink-connector-kafka-0.10_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.6.1\flink-connector-kafka-0.9_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-base_2.11\1.6.1\flink-connector-kafka-base_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;E:\tools\Maven-Repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-client\2.7.6\hadoop-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-common\2.7.6\hadoop-common-2.7.6.jar;E:\tools\Maven-Repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;E:\tools\Maven-Repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;E:\tools\Maven-Repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;E:\tools\Maven-Repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;E:\tools\Maven-Repository\commons-net\commons-net\3.1\commons-net-3.1.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-sslengine\6.1.26\jetty-sslengine-6.1.26.jar;E:\tools\Maven-Repository\javax\servlet\jsp\jsp-api\2.1\jsp-api-2.1.jar;E:\tools\Maven-Repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\tools\Maven-Repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;E:\tools\Maven-Repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;E:\tools\Maven-Repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;E:\tools\Maven-Repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-log4j12\1.7.10\slf4j-log4j12-1.7.10.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;E:\tools\Maven-Repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;E:\tools\Maven-Repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;E:\tools\Maven-Repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;E:\tools\Maven-Repository\com\google\code\gson\gson\2.2.4\gson-2.2.4.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-auth\2.7.6\hadoop-auth-2.7.6.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpclient\4.2.5\httpclient-4.2.5.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpcore\4.2.4\httpcore-4.2.4.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-kerberos-codec\2.0.0-M15\apacheds-kerberos-codec-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-i18n\2.0.0-M15\apacheds-i18n-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-asn1-api\1.0.0-M20\api-asn1-api-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-util\1.0.0-M20\api-util-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\curator\curator-framework\2.7.1\curator-framework-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-client\2.7.1\curator-client-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-recipes\2.7.1\curator-recipes-2.7.1.jar;E:\tools\Maven-Repository\org\apache\htrace\htrace-core\3.1.0-incubating\htrace-core-3.1.0-incubating.jar;E:\tools\Maven-Repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-hdfs\2.7.6\hadoop-hdfs-2.7.6.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\tools\Maven-Repository\io\netty\netty\3.6.2.Final\netty-3.6.2.Final.jar;E:\tools\Maven-Repository\io\netty\netty-all\4.0.23.Final\netty-all-4.0.23.Final.jar;E:\tools\Maven-Repository\xerces\xercesImpl\2.9.1\xercesImpl-2.9.1.jar;E:\tools\Maven-Repository\xml-apis\xml-apis\1.3.04\xml-apis-1.3.04.jar;E:\tools\Maven-Repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.7.6\hadoop-mapreduce-client-app-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.7.6\hadoop-mapreduce-client-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-client\2.7.6\hadoop-yarn-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-server-common\2.7.6\hadoop-yarn-server-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.7.6\hadoop-mapreduce-client-shuffle-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-api\2.7.6\hadoop-yarn-api-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.7.6\hadoop-mapreduce-client-core-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-common\2.7.6\hadoop-yarn-common-2.7.6.jar;E:\tools\Maven-Repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;E:\tools\Maven-Repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;E:\tools\Maven-Repository\javax\activation\activation\1.1\activation-1.1.jar;E:\tools\Maven-Repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-client\1.9\jersey-client-1.9.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-jaxrs\1.9.13\jackson-jaxrs-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-xc\1.9.13\jackson-xc-1.9.13.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.7.6\hadoop-mapreduce-client-jobclient-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-annotations\2.7.6\hadoop-annotations-2.7.6.jar;E:\tools\Maven-Repository\mysql\mysql-connector-java\5.1.38\mysql-connector-java-5.1.38.jar;E:\tools\Maven-Repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar StreamingWindowWordCount
没有指定port参数,使用默认值9000
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]}]}
WordWithCount{word='a', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='a', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。
仔细看还有一串json输出,这部分是什么呢?
代码中加了一个打印执行计划的部分:

1
2
/获取可视化JSON
System.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:
image.png

-------------���Ľ�����л�����Ķ�-------------
qinchaofeng wechat
坚持原创技术分享,您的支持将鼓励我继续创作!