I have following code that wants to write the data generated with datagen to a file, but when I run the application, no target directory is created, and no data is written.
When I add env.execute()
at the end of the code, it complains that No operators defined in streaming topology. Cannot execute.
I would ask how to make the application work, thanks.
test("insert into table") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create temporary table abc(
name STRING,
age INT
) with (
'connector' = 'datagen'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val sql =
"""
select * from abc
""".stripMargin(' ')
val sinkDDL =
s"""
create temporary table xyz(
name STRING,
age INT
) with (
'connector' = 'filesystem',
'path' = 'D:\${System.currentTimeMillis()}-csv' ,
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(sinkDDL)
val insertInSQL =
"""
insert into xyz
select name, age from abc
""".stripMargin(' ')
tenv.executeSql(insertInSQL)
// env.execute()
}