flink 1.7 比较强大,支持 sqlClient :https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sqlClient.html 目前只是早期开发阶段,不能用于生产环境,但对于开发、调试很有帮助,未来如可投入生产,将极大减少实时计算的开发工作量。

在将我的代码改为 1.7 的 api 之后,运行代码总是报以下错误:

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

	at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
	at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
	at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
	at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
	at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
	... 12 more

google 了一下,资料少的可怜,有遇到过类似的问题,解决方案包括将 flink kafka、json jar 放到 flink 服务的 lib 中,将 table factory 放到 meta-inf 的 services 文件中,无效

https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto

本地进行 debug,进行到类 org.apache.flink.table.factories.TableFactoryService  的 filterByContext 方法时:

/**
    * Filters for factories with matching context.
    *
    * @return all matching factories
    */
  private def filterByContext[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val matchingFactories = classFactories.filter { factory =>
      val requestedContext = normalizeContext(factory)

      val plainContext = mutable.Map[String, String]()
      plainContext ++= requestedContext
      // we remove the version for now until we have the first backwards compatibility case
      // with the version we can provide mappings in case the format changes
      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
      plainContext.remove(FORMAT_PROPERTY_VERSION)
      plainContext.remove(METADATA_PROPERTY_VERSION)
      plainContext.remove(STATISTICS_PROPERTY_VERSION)

      // check if required context is met
      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
    }

    if (matchingFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        "No context matches.",
        factoryClass,
        foundFactories,
        properties)
    }

    matchingFactories
  }

注意 classFactories.filter 方法,它将 plainContext 与 properties 进行比较,判断是否前者中的元素全部在后者中包含,查看元素值发现,plainContext 中包含 connector.type ,值为 kafka 的元素,而 properties   中不包含该属性,所以匹配失败,找不到 table source ,不能创建。

查看官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html 发现虽然它的代码是这样写的:

tableEnvironment
  // declare the external system to connect to
  .connect(
    new Kafka()
      .version("0.10")
      .topic("test-input")
      .startFromEarliest()
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
  )

  // declare a format for this system
  .withFormat(
    new Avro()
      .avroSchema(
        "{" +
        "  \"namespace\": \"org.myorganization\"," +
        "  \"type\": \"record\"," +
        "  \"name\": \"UserMessage\"," +
        "    \"fields\": [" +
        "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
        "      {\"name\": \"user\", \"type\": \"long\"}," +
        "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
        "    ]" +
        "}" +
      )
  )

  // declare the schema of the table
  .withSchema(
    new Schema()
      .field("rowtime", Types.SQL_TIMESTAMP)
        .rowtime(new Rowtime()
          .timestampsFromField("timestamp")
          .watermarksPeriodicBounded(60000)
        )
      .field("user", Types.LONG)
      .field("message", Types.STRING)
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSource("MyUserTable");

但是其在 yaml 的配置参数中包含了 connector.type 配置项:

 # declare the external system to connect to
    connector:
      type: kafka
      version: "0.10"
      topic: test-input
      startup-mode: earliest-offset
      properties:
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092

这有点无语,代码指定了连接实现类,竟然不能自动识别连接类型。所以解决方案就是,在我的 jar 启动命令中添加该参数:--connector.type kafka


总结

flink 1.7 的 connect api 中不能自动识别连接类型,需要在启动参数中手动启动,如连接 kafka,--connector.type kafka ,否则将找不到 table source factory 实现类,报 org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. 错误。