Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

Is a table registered with registerTempTable (createOrReplaceTempView with spark 2.+) cached?

Using Zeppelin, I register a DataFrame in my scala code, after heavy computation, and then within %pyspark I want to access it, and further filter it.

Will it use a memory-cached version of the table? Or will it be rebuilt each time?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
368 views
Welcome To Ask or Share your Answers For Others

1 Answer

Registered tables are not cached in memory.

The registerTempTable createOrReplaceTempView method will just create or replace a view of the given DataFrame with a given query plan.

It will convert the query plan to canonicalized SQL string, and store it as view text in metastore, if we need to create a permanent view.

You'll need to cache your DataFrame explicitly. e.g :

df.createOrReplaceTempView("my_table") # df.registerTempTable("my_table") for spark <2.+
spark.cacheTable("my_table") 

EDIT:

Let's illustrate this with an example :

Using cacheTable :

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> sc.getPersistentRDDs
// res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> spark.catalog.cacheTable("my_table") // spark.cacheTable("...") before spark 2.0

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(2 -> In-memory table my_table MapPartitionsRDD[2] at cacheTable at <console>:26)

Now the same example using cache.registerTempTable cache.createOrReplaceTempView :

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res6: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = 
// Map(2 -> ConvertToUnsafe
// +- LocalTableScan [_1#0,_2#1], [[1,2],[b,3]]
//  MapPartitionsRDD[2] at cache at <console>:28)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...