Unit testing Apache Spark Structured Streaming jobs using MemoryStream in a non-trivial task. Sadly enough, official Spark documentation still lacks a section on testing. In this post, therefore, I will show you how to start writing unit tests of Spark Structured Streaming.
What is MemoryStream?
A Source that produces value stored in memory as they are added by the user. This Source is intended for use in unit tests as it can only replay data when the object is still available.
MemoryStream is one of the streaming sources available in Apache Spark. This source allows us to add and store data in memory, which is very convenient for unit testing. The official docs emphasize this, along with a warning that data can be replayed only when the object is still available.
The MemoryStream takes a type parameter, which gives our unit tests a very desired type safety. The API of this abstraction is rich but not overwhelming. You should check it out for a full reference.
Before writing any unit test, let’s create some sample job which will be tested.
Writing Spark Structured Streaming job
The job I will be using for the testing, has a simple role — read data from Kafka data source and write it to the Mongo database. In your case, the set of transformations and aggregations will be probably much richer, but the principles stay the same.
First, I read the Kafka data source and extract the value
column. I am also applying a prebuilt rsvpStruct
schema, but that is specific to my code sample.
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "rsvp")
.load()
val rsvpJsonDf = df.selectExpr("CAST(value as STRING)")
val rsvpStruct = Schema.getRsvpStructSchema
val rsvpDf = rsvpJsonDf.select(from_json($"value", rsvpStruct).as("rsvp"))
Next, I am outputting the content of parsed data into a Mongo collection. That is done is foreachBatch
action, which loops through every batch in the current result set and allows to perform an arbitrary operation. In my case, I am using Mongo Spark connector for persisting the batches.
rsvpDf.writeStream
.outputMode("append")
.option("checkpointLocation", "/raw")
.foreachBatch({(batchDf: DataFrame, batchId: Long) =>
val outputDf = batchDf.select("rsvp.*")
outputDf.write
.format("mongo")
.mode("append")
.save()
})
.start()
.awaitTermination()
Now that we have the sample job created, let’s start writing a unit test using MemoryStream class.
Unit testing Spark using MemoryStream
The preliminary step in writing a unit test is to have some sample data that can be used by unit tests. In my case, I am using a stringified JSON, that contains information I am normally receiving from my Kafka data source. You should use any data that fits your use case.
val sampleRsvp = """{"venue":{"venue_name":"Capitello Wines" ...
Next, inside the test case, we create our SparkSession
. Nothing special has to be set here, just typical config you use.
class SaveRawDataTest extends AnyFunSuite {
test("should ingest raw RSVP data and read the parsed JSON correctly") {
val spark = SparkSession.builder()
.appName("Save Raw Data Test")
.master("local[1]")
.getOrCreate()
...
Going further, we define our MemoryStream
using String type parameter. I use string, because that's the type of sample data I am using. To create the MemoryStream
, to imports are needed: sqlContext
that can be obtained from SparkSession and Spark implicits
library, which includes the needed type encoders. Lastly, I convert the stream to DataSet
.
implicit val sqlCtx: SQLContext = spark.sqlContext
import spark.implicits._
val events = MemoryStream[String]
val sessions = events.toDS
Converting to DataSet
allows us to make first assertion. We can check if the DataSet
is indeed a streaming one. You can do this using the following code.
assert(sessions.isStreaming, "sessions must be a streaming Dataset")
Next, I add the transformations that have been included in the sample job. In this case however, I will not be checking if the data has been saved in Mongo — I am only interested if the raw stringified JSON has been parse correctly and I can run SQL queries on it. The transformations I use are:
val transformedSessions = sessions.select(from_json($"value", rsvpStruct).as("rsvp"))
val streamingQuery = transformedSessions
.writeStream
.format("memory")
.queryName("rawRsvp")
.outputMode("append")
.start
It is important to use memory
format, so that the actual data can be queried in a further step to make the assertions. It is useful to also name those results, using queryName
option.
Lastly, I add my sample data into the instance of MemoryStream
, process those and commit the offsets, as shown below:
val batch = sampleRsvp
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])
The very last step is to query the committed data and run some assertions on them. In my case, I run a SQL
query, to get the event_name property and I run assertions against that.
val rsvpEventName = spark.sql("select rsvp.event.event_name from rawRsvp")
.collect()
.map(_.getAs[String](0))
.head
assert(rsvpEventName == "Eugene ISSA & Technology Association of Oregon December Cyber Security Meetup")
Summary
I hope you have found this post useful. If so, don’t hesitate to like or share this post. Additionally, you can follow me on my social media if you fancy so 🙂
Top comments (0)