Today we’ll briefly showcase how to join a static dataset in Spark with a streaming “live” dataset, otherwise known as a DStream. This is helpful in a number of scenarios: like when you have a live stream of data from Kafka (or RabbitMQ, Flink, etc) that you want to join with tabular data you queried from a database (or a Hive table, or a file, etc), or anything you can normally consume into Spark.

Let’s dive right in with an overview of the code that’ll do the trick. But first… let’s assume we have some transaction data streaming in from Kafka that contains a timestamp, a customer id, a transaction id (unique to this transaction), and a transaction amount. Let’s also assume we have a dataset of line-item details for each transaction in a file, Hive table, or other database that we want to join with this stream (using transaction_id).

But how? With the transform() method in the DStream class, which, by definition will “return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.”

Here’s one way to do it in a few short steps:

// create a case class to represent a Transaction (from streaming)
case class Transaction(
ts: Int,
customer_id: Int,
transaction_id: String,
amount: Double
)
// create a case class to represent a TransactionDetail (from static)
case class TransactionDetail(
transaction_id: String,
country: String,
state: String,
zip: Long,
RepeatCustomer: Boolean
)
// Pull in a static dataset by whatever means you prefer
// For this example, this object is Dataset[String, TransactionDetail] --> transaction_id, rest_of_the_data
val transaction_details = .. // some static data that returns in this form: Dataset[(String, TransactionDetail)]
// where String is the transaction_id and the latter is data in whatever form you prefer (string, dataframe, case class, etc)
// simply use a map like the one below to map your data into this structure
// the only thing that matters is getting the column you wish to join on as the first position in your object, as the join implicitly joins on this value
// Pull in a Dstream of some streaming data
val live_transactions = ... // some streaming data from Kafka, Flink, etc
// Create a new DStream where each object has two positions: first: the column you want to join on (transaction_id), second: the whole data object (Transaction object)
// which gives it this structure: DStream[transaction_id, Transaction]
// you'll need transaction_details to be in a similar format of: (transaction_id, TransactionDetail)
val transactions = live_transactions.map(x => (x.transaction_id, Transaction(
x.ts,
x.customer_id,
x.transaction_id,
x.amount)
))
// Create an RDD from your static dataset (if it's not an RDD already, and instead a dataset or dataframe)
val transaction_details_rdd = transaction_details.rdd
// Take the new DStream and use the transform() command to join the static dataset to it (using transaction_id)
val complete_transaction_data = transactions.transform(live_transaction =>
live_transaction.join(transaction_details_rdd) // <-- you can use left, right and full outer joins as well
)

The result is a DStream that contains the transaction_id, the live transaction record (from Kafka), and the transaction detail record (from whatever source you used). This is because the join function used the transaction_id present in both the Kafka record (Transaction) and the static data.

It’s that simple. Thanks, transform() !

4 thoughts

  1. It doesn’t work for me. Spark doesn’t allow to use other rdds inside the functions:

    sDf = sqlContext.read.json(“s3a://bucket/s.json”)
    sRDD = sDf .rdd.map(lambda e: (e[‘id’],e))

    dstream = dstream.map(lambda x: (x[‘id’],x))
    dstream = dstream.transform(lambda x: x.join(sRDD))

    PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

    Like

  2. Interesting. Thanks for responding.
    My guess is the use of lambdas is producing the error but I’m not sure. Seems like it’s triggering an action within dataset one before the join occurs. Maybe try doing just a simple join first?

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.