Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I want to receive JSON strings from MQTT and parse them to DataFrames df. How can I do it?

This is an example of Json message that I send to MQTT queue in order to process in Spark:

{
"id": 1,
"timestamp": 1532609003,
"distances": [2,5,7,8]
}

This is my code:

from pyspark.sql import SparkSession

spark = SparkSession 
    .builder 
    .appName("Test") 
    .master("local[4]") 
    .getOrCreate()

# Custom Structured Streaming receiver
reader = spark
             .readStream
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
             .option("topic","uwb/distances")
             .option('brokerUrl', 'tcp://127.0.0.1:1883')
             .load()
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")


df = spark.read.json(reader.select("value").rdd)

# Start running the query that prints the running counts to the console
query = df 
    .writeStream 
    .format('console') 
    .start()

query.awaitTermination()

But this code fails:

py4j.protocol.Py4JJavaError: An error occurred while calling o45.javaToPython.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt

I tried to add start as follows:

df = spark.read.json(reader.select("value").rdd) 
    .writeStream 
    .format('console') 
    .start()

But got the same error. My goal is to get a DataFrame df that I can further pass through ETL processes.

UPDATE:

The thread marked as an answer has not helped me solving the problem. First of all it gives the solution for Scala, while I am using PySpark. Secondly, I tested the solution proposed in the answer and it returned me the empty column json:

reader = spark
             .readStream
             .schema(spark.read.json("mqtt_schema.json").schema) 
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
             .option("topic","uwb/distances")
             .option('brokerUrl', 'tcp://127.0.0.1:1883')
             .load()
             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")

json_schema = spark.read.json("mqtt_schema.json").schema
df = reader.withColumn('json', from_json(col('value'), json_schema))

query = df 
    .writeStream 
    .format('console') 
    .start()
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
237 views
Welcome To Ask or Share your Answers For Others

1 Answer

You have to use from_json or equivalent method. If structure of the document looks like in the question

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

schema = StructType([
    StructField("id", LongType()),
    StructField("timestamp", LongType()),
    StructField("distances", ArrayType(LongType()))
])


ds = spark.readStream.load(...)

ds.withColumn("value", from_json(col("value").cast("string"), schema))

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...