学习 Fink(二):Flink 开发环境搭建

更新至 Flink 1.8.0 版本

开发环境

  • JDK 8
  • Maven 3
  • IntelliJ IDEA

Flink 官方支持使用 Java 和 Scala 开发 Flink 应用。Scala API 问题较多,推荐使用 Java 作为开发语言。

pom.xml

定义属性:

<properties>  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <scala.version>2.11</scala.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>

    <flink.version>1.8.0</flink.version>
</properties>  

定义依赖:

<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>  
<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>  
<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>  
<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>  

定义构建:

<build>  
    <finalName>flink-demo</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.dyingbleed.demo.flink.Application</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>

    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.eclipse.m2e</groupId>
                <artifactId>lifecycle-mapping</artifactId>
                <version>1.0.0</version>
                <configuration>
                    <lifecycleMappingMetadata>
                        <pluginExecutions>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-shade-plugin</artifactId>
                                    <versionRange>[3.0.0,)</versionRange>
                                    <goals>
                                        <goal>shade</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <versionRange>[3.1,)</versionRange>
                                    <goals>
                                        <goal>testCompile</goal>
                                        <goal>compile</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                        </pluginExecutions>
                    </lifecycleMappingMetadata>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
</build>  

log4j.properties

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender  
log4j.appender.console.layout=org.apache.log4j.PatternLayout  
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n  

本地监控(可选)

编辑 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>  

初始化执行环境:

import org.apache.flink.configuration.ConfigConstants;  
import org.apache.flink.configuration.Configuration;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Configuration config = new Configuration();  
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);  
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);  

应用启动,浏览器访问:http://localhost:8081