Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have following code that wants to write the data generated with datagen to a file, but when I run the application, no target directory is created, and no data is written.

When I add env.execute() at the end of the code, it complains that No operators defined in streaming topology. Cannot execute.

I would ask how to make the application work, thanks.

test("insert into table") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      """
      create temporary table abc(
       name STRING,
       age INT
      ) with (
        'connector' = 'datagen'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val sql =
      """
        select * from abc
      """.stripMargin(' ')

    val sinkDDL =
      s"""
      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = 'D:\${System.currentTimeMillis()}-csv' ,
       'format' = 'csv'

      )
      """.stripMargin(' ')

    tenv.executeSql(sinkDDL)

    val insertInSQL =
      """
      insert into xyz
      select name, age from abc
      """.stripMargin(' ')

    tenv.executeSql(insertInSQL)


//    env.execute()


  }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
600 views
Welcome To Ask or Share your Answers For Others

1 Answer

I think you should have UDF in table execution, see

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions

You could see the example, write the function and insert it into your sql pipeline, this works as the "operator" in your error msg.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...