flink 1.7 比较强大,支持 sqlClient :https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sqlClient.html 目前只是早期开发阶段,不能用于生产环境,但对于开发、调试很有帮助,未来如可投入生产,将极大减少实时计算的开发工作量。
在将我的代码改为 1.7 的 api 之后,运行代码总是报以下错误:
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
Reason: No context matches.
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 12 more
google 了一下,资料少的可怜,有遇到过类似的问题,解决方案包括将 flink kafka、json jar 放到 flink 服务的 lib 中,将 table factory 放到 meta-inf 的 services 文件中,无效
本地进行 debug,进行到类 org.apache.flink.table.factories.TableFactoryService 的 filterByContext 方法时:
/**
* Filters for factories with matching context.
*
* @return all matching factories
*/
private def filterByContext[T](
factoryClass: Class[T],
properties: Map[String, String],
foundFactories: Seq[TableFactory],
classFactories: Seq[TableFactory])
: Seq[TableFactory] = {
val matchingFactories = classFactories.filter { factory =>
val requestedContext = normalizeContext(factory)
val plainContext = mutable.Map[String, String]()
plainContext ++= requestedContext
// we remove the version for now until we have the first backwards compatibility case
// with the version we can provide mappings in case the format changes
plainContext.remove(CONNECTOR_PROPERTY_VERSION)
plainContext.remove(FORMAT_PROPERTY_VERSION)
plainContext.remove(METADATA_PROPERTY_VERSION)
plainContext.remove(STATISTICS_PROPERTY_VERSION)
// check if required context is met
plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
}
if (matchingFactories.isEmpty) {
throw new NoMatchingTableFactoryException(
"No context matches.",
factoryClass,
foundFactories,
properties)
}
matchingFactories
}
注意 classFactories.filter 方法,它将 plainContext 与 properties 进行比较,判断是否前者中的元素全部在后者中包含,查看元素值发现,plainContext 中包含 connector.type ,值为 kafka 的元素,而 properties 中不包含该属性,所以匹配失败,找不到 table source ,不能创建。
查看官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html 发现虽然它的代码是这样写的:
tableEnvironment
// declare the external system to connect to
.connect(
new Kafka()
.version("0.10")
.topic("test-input")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
// declare a format for this system
.withFormat(
new Avro()
.avroSchema(
"{" +
" \"namespace\": \"org.myorganization\"," +
" \"type\": \"record\"," +
" \"name\": \"UserMessage\"," +
" \"fields\": [" +
" {\"name\": \"timestamp\", \"type\": \"string\"}," +
" {\"name\": \"user\", \"type\": \"long\"}," +
" {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
" ]" +
"}" +
)
)
// declare the schema of the table
.withSchema(
new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000)
)
.field("user", Types.LONG)
.field("message", Types.STRING)
)
// specify the update-mode for streaming tables
.inAppendMode()
// register as source, sink, or both and under a name
.registerTableSource("MyUserTable");
但是其在 yaml 的配置参数中包含了 connector.type 配置项:
# declare the external system to connect to
connector:
type: kafka
version: "0.10"
topic: test-input
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
这有点无语,代码指定了连接实现类,竟然不能自动识别连接类型。所以解决方案就是,在我的 jar 启动命令中添加该参数:--connector.type kafka
flink 1.7 的 connect api 中不能自动识别连接类型,需要在启动参数中手动启动,如连接 kafka,--connector.type kafka ,否则将找不到 table source factory 实现类,报 org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. 错误。