flink 扔下有一段时间了,最开始看文档、demo,看完了也就无所事事了,最近开发了很多数据格式转换,逻辑处理的小 java 程序,最开始还觉得挺好,开发方便、简单。当逻辑变多了,麻烦就来了,很多的转换、清洗逻辑放在一起,一方有变化,容易互相影响,部署起来也很麻烦,也不容易测试。并且大部分程序都是临时开发,运行一段时间就不会再用了,类似的程序可以用 yarn 的 MapReduce 来做,就当做一个任务了,刚好最近了解了一下 flink,打算用这个试试水。
第一步的逻辑很简单,从 kafka 拿到原始数据,转换为 json 字符串,发送到另一 kafka 主题中,代码如下:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "log_flink_convert_test");
String topic = "glbg-analitic";
//FlinkKafkaConsumer011<String> flinkKafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
// ParameterTool.fromArgs(args); 运行参数
DataStream<String> messageStream = see.addSource(new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties));
DataStream<String> stringDataStream = messageStream.filter(s -> s.contains("/_ubc.gif?")).map(new MapFunction<String, String>() {
/**
* The mapping method. Takes an element from the input data set and transforms
* it into exactly one element.
*
* @param value The input value.
* @return The transformed value
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
@Override
public String map(String value) throws Exception {
return GsonUtil.toJson(NginxLogConvertUtil.getNginxLogParameters(value));
}
});
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"localhost:9092", // broker list
"test_log_json_2", // target topic
new SimpleStringSchema());
myProducer.setWriteTimestampToKafka(true);
stringDataStream.addSink(myProducer);
see.execute("nginx_log_convert");
为了简单,kafka 等参数是写死在代码里,可用 ParameterTool.fromArgs(args) 类来使参数动态传入,参考链接:https://data-artisans.com/blog/kafka-flink-a-practical-how-to
关于消费者消费消息之后,怎么把转换后的消息发送到另一个 kafka 主题中,这一步想了很久,也找了很多资料,没有答案,吃完饭后突然想明白了,我的思路方式还是传统的 java 应用的开发方式,没有流式处理的思路方法。
实际上,flink customer 消费消息并将消息转换为 json 后,实际上是生成了一个新的流,数据输出到另一个 kafka 是通过这个新的流来完成。就是代码中的 DataStream<String> stringDataStream。
画了个简图:
代码目前没有整理到 github,先提供一下 maven 依赖:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.5.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-wikiedits -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- This dependency is required to actually execute jobs. It is currently pulled in by
flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- explicitly add a standard logging framework, as Flink does not have
a hard dependency on one specific framework by default -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>