flink 扔下有一段时间了,最开始看文档、demo,看完了也就无所事事了,最近开发了很多数据格式转换,逻辑处理的小 java 程序,最开始还觉得挺好,开发方便、简单。当逻辑变多了,麻烦就来了,很多的转换、清洗逻辑放在一起,一方有变化,容易互相影响,部署起来也很麻烦,也不容易测试。并且大部分程序都是临时开发,运行一段时间就不会再用了,类似的程序可以用 yarn 的 MapReduce 来做,就当做一个任务了,刚好最近了解了一下 flink,打算用这个试试水。

第一步的逻辑很简单,从 kafka 拿到原始数据,转换为 json 字符串,发送到另一 kafka 主题中,代码如下:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(5000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "log_flink_convert_test");
        String topic = "glbg-analitic";
        //FlinkKafkaConsumer011<String> flinkKafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
        // ParameterTool.fromArgs(args); 运行参数

        DataStream<String> messageStream = see.addSource(new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties));

        DataStream<String> stringDataStream = messageStream.filter(s -> s.contains("/_ubc.gif?")).map(new MapFunction<String, String>() {

            /**
             * The mapping method. Takes an element from the input data set and transforms
             * it into exactly one element.
             *
             * @param value The input value.
             * @return The transformed value
             * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
             *                   to fail and may trigger recovery.
             */
            @Override
            public String map(String value) throws Exception {
                return GsonUtil.toJson(NginxLogConvertUtil.getNginxLogParameters(value));
            }
        });


        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
                "localhost:9092",            // broker list
                "test_log_json_2",                  // target topic
                new SimpleStringSchema());
        myProducer.setWriteTimestampToKafka(true);

        stringDataStream.addSink(myProducer);


        see.execute("nginx_log_convert");

为了简单,kafka 等参数是写死在代码里,可用 ParameterTool.fromArgs(args) 类来使参数动态传入,参考链接:https://data-artisans.com/blog/kafka-flink-a-practical-how-to

关于消费者消费消息之后,怎么把转换后的消息发送到另一个 kafka 主题中,这一步想了很久,也找了很多资料,没有答案,吃完饭后突然想明白了,我的思路方式还是传统的 java 应用的开发方式,没有流式处理的思路方法。

实际上,flink customer 消费消息并将消息转换为 json 后,实际上是生成了一个新的流,数据输出到另一个 kafka 是通过这个新的流来完成。就是代码中的 DataStream<String> stringDataStream。

画了个简图:


代码目前没有整理到 github,先提供一下 maven 依赖:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.5.0</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-wikiedits -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <!-- This dependency is required to actually execute jobs. It is currently pulled in by
                flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- explicitly add a standard logging framework, as Flink does not have
            a hard dependency on one specific framework by default -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>



        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

    </dependencies>


<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>