Today we’ll briefly showcase how to join a static dataset in Spark with a streaming “live” dataset, otherwise known as a DStream. This is helpful in a number of scenarios: like when you have a live stream of data from Kafka (or RabbitMQ, Flink, etc) that you want to join with tabular data you queried from a database (or a Hive table, or a file, etc), or anything you can normally consume into Spark.
Let’s dive right in with an overview of the code that’ll do the trick. But first… let’s assume we have some transaction data streaming in from Kafka that contains a timestamp, a customer id, a transaction id (unique to this transaction), and a transaction amount. Let’s also assume we have a dataset of line-item details for each transaction in a file, Hive table, or other database that we want to join with this stream (using transaction_id).
But how? With the transform() method in the DStream class, which, by definition will “return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.”
Here’s one way to do it in a few short steps:
// create a case class to represent a Transaction (from streaming)
case class Transaction(
// create a case class to represent a TransactionDetail (from static)
case class TransactionDetail(
// Pull in a static dataset by whatever means you prefer
// For this example, this object is Dataset[String, TransactionDetail] --> transaction_id, rest_of_the_data
val transaction_details = .. // some static data that returns in this form: Dataset[(String, TransactionDetail)]
// where String is the transaction_id and the latter is data in whatever form you prefer (string, dataframe, case class, etc)
// simply use a map like the one below to map your data into this structure
// the only thing that matters is getting the column you wish to join on as the first position in your object, as the join implicitly joins on this value
// Pull in a Dstream of some streaming data
val live_transactions = ... // some streaming data from Kafka, Flink, etc
// Create a new DStream where each object has two positions: first: the column you want to join on (transaction_id), second: the whole data object (Transaction object)
// which gives it this structure: DStream[transaction_id, Transaction]
// you'll need transaction_details to be in a similar format of: (transaction_id, TransactionDetail)
val transactions = live_transactions.map(x => (x.transaction_id, Transaction(
// Create an RDD from your static dataset (if it's not an RDD already, and instead a dataset or dataframe)
val transaction_details_rdd = transaction_details.rdd
// Take the new DStream and use the transform() command to join the static dataset to it (using transaction_id)
val complete_transaction_data = transactions.transform(live_transaction =>
live_transaction.join(transaction_details_rdd) // <-- you can use left, right and full outer joins as well
The result is a DStream that contains the transaction_id, the live transaction record (from Kafka), and the transaction detail record (from whatever source you used). This is because the join function used the transaction_id present in both the Kafka record (Transaction) and the static data.
It’s that simple. Thanks, transform() !