Today we’ll briefly showcase how to join a static dataset in Spark with a streaming “live” dataset, otherwise known as a DStream. This is helpful in a number of scenarios: like when you have a live stream of data from Kafka (or RabbitMQ, Flink, etc) that you want to join with tabular data you queried from a database (or a Hive table, or a file, etc), or anything you can normally consume into Spark.

Let’s dive right in with an overview of the code that’ll do the trick. But first… let’s assume we have some transaction data streaming in from Kafka that contains a timestamp, a customer id, a transaction id (unique to this transaction), and a transaction amount. Let’s also assume we have a dataset of line-item details for each transaction in a file, Hive table, or other database that we want to join with this stream (using transaction_id).

But how? With the transform() method in the DStream class, which, by definition will “return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.”

Here’s one way to do it in a few short steps:

The result is a DStream that contains the transaction_id, the live transaction record (from Kafka), and the transaction detail record (from whatever source you used). This is because the join function used the transaction_id present in both the Kafka record (Transaction) and the static data.

It’s that simple. Thanks, transform() !

4 thoughts

  1. It doesn’t work for me. Spark doesn’t allow to use other rdds inside the functions:

    sDf =“s3a://bucket/s.json”)
    sRDD = sDf e: (e[‘id’],e))

    dstream = x: (x[‘id’],x))
    dstream = dstream.transform(lambda x: x.join(sRDD))

    PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the transformation. For more information, see SPARK-5063.


  2. Interesting. Thanks for responding.
    My guess is the use of lambdas is producing the error but I’m not sure. Seems like it’s triggering an action within dataset one before the join occurs. Maybe try doing just a simple join first?


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.