Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a problem when i use spark streaming to read from Cassandra.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

As the link above, i use

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.

My code is as follow, wish for your response.

Thanks!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  

}

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
880 views
Welcome To Ask or Share your Answers For Others

1 Answer

You can create a ConstantInputDStream with the CassandraRDD as input. ConstantInputDStream will provide the same RDD on each streaming interval, and by executing an action on that RDD you will trigger a materialization of the RDD lineage, leading to executing the query on Cassandra every time.

Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.

Something like this should do the trick (using your code as starting point):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output
    println(rdd.collect.mkString("
")) 
}
ssc.start()
ssc.awaitTermination()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...