Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:

public class WordCount {

   public static void main(String[] args) throws Exception {

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   {
          // normalize and split the line
             String[] words = value.toLowerCase().split("\W+");

                    // emit the pairs
             for (String word : words) {
                  if (!word.isEmpty()) {
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        }
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) {
           counts.writeAsText(params.get("output"));
          } else {
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         }
    // execute program
    env.execute("Streaming WordCount");

    }//main
   }

  public class kafka_test {
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  }

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     }
  }

Would you please guide me how to broadcast a socket stream data between different kafka nodes?

Any help would be appreciated.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
181 views
Welcome To Ask or Share your Answers For Others

1 Answer

I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.

Check the answer here to better understand Kafka topics and partitions.

Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic

This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.

Then you just have to create your Kafka Sink

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...