Let’s look at one way to get Cassandra data into Hadoop.

Cassandra is a great open-source solution for accessing data at web scale, thanks in no small part to its low-latency performance. And if you’re a power user of Cassandra, there’s a high probability you’ll want to analyze the data it contains to create reports, apply machine learning, or just do some good old fashioned digging.

However, Cassandra can prove difficult to use as an analytical warehouse, especially if you’re using it to serve data in production around the clock. But one approach you can take is quite simple: copy the data to Hadoop (HDFS).

But how? There are a few feasible approaches – but the easiest to implement and scale, in my experience, is using an Apache Spark application using YARN on your Hadoop cluster to pull the data from Cassandra and into HDFS.

And it all hinges on using a piece of software from DataStax, the distributor of Cassandra, called the Spark Cassandra Connector. It’s a thing of beauty, so my hat is off to the engineering team that made it available to the community. With that said…

What is the DataStax Spark Cassandra Connector?

To quote their very-detailed documentation:

This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

https://github.com/datastax/spark-cassandra-connector

Since our use case is to offload Cassandra data to HDFS, we can directly benefit from “exposing Cassandra tables as Spark RDDs”, since we can write RDDs quite trivially to HDFS.

What Versions of Spark, Cassandra, Java and Scala are supported?

It’s worth noting the most up-to-date version-to-version compatibility chart that exists for the connector and its support for Spark, Cassandra, Java, and Scala.

ConnectorSparkCassandraCassandra Java DriverMinimum Java VersionSupported Scala Versions
2.42.42.1.5*, 2.2, 3.03.082.11
2.32.32.1.5*, 2.2, 3.03.082.11
2.02.0, 2.1, 2.22.1.5*, 2.2, 3.03.082.10, 2.11
1.61.62.1.5*, 2.2, 3.03.072.10, 2.11
1.51.5, 1.62.1.5*, 2.2, 3.03.072.10, 2.11
1.41.42.1.5*2.172.10, 2.11
1.31.32.1.5*2.172.10, 2.11
1.21.22.1, 2.02.172.10, 2.11
1.11.1, 1.02.1, 2.02.172.10, 2.11
1.01.0, 0.92.02.072.10, 2.11

So, for the sake of this example, let’s say we’re going with the row in bold, which utilizes version 2.0 of the connector.

How to Use the Connector in a Spark App

Start by importing the connector in your build.sbt file in your project:

"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.9"

Then, simply utilize it using the spark.read() API, as below:

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._
object CassandraLoader extends Serializable {
/** Representative of the some_keyspace.some_table table. */
case class MyCassandraTable(user_id: String, `type`: Int, key: String, value: String)
def main(args: Array[String]) { // scalastyle:off method.length
/**************************************************************************************
* INITIATE SPARK SESSION
* This session will be used to ingest data from a Cassandra cluster.
*************************************************************************************/
val spark = SparkSession
.builder()
.config("hive.merge.orcfile.stripe.level", "false")
.appName("Cassandra Data Loader")
.enableHiveSupport()
.getOrCreate()
// Implicits Allow us to Use .as[CaseClass]
import spark.implicits._
/**************************************************************************************
* SETUP CASSANDRA CONNECTION
* These settings determine which environment, keyspace and table to download.
*************************************************************************************/
val user = Map("spark.cassandra.auth.username" -> "some_username")
val pwd = Map("spark.cassandra.auth.password" -> "some_password")
// Setup an entrypoint into your cassandra cluster from spark
val hosts = "cassandra001.yourcompany.com,cassandra002.yourcompany.com,cassandra003.yourcompany.com"
val port = "9042"
// Set Cassandra Connection Configuration in Spark Session
spark.setCassandraConf(
CassandraConnectorConf.KeepAliveMillisParam.option(1000) ++
CassandraConnectorConf.ConnectionHostParam.option(hosts) ++
CassandraConnectorConf.ReadTimeoutParam.option(240000) ++
CassandraConnectorConf.ConnectionPortParam.option(port) ++
user ++ pwd)
// Imply which keyspace.table to consume from Cassandra
val table = Map("keyspace" -> "some_keyspace", "table" -> "some_table_in_that_keyspace")
/**************************************************************************************
* CONSUME DATA FROM CASSANDRA
* Use the connector, via the format() method, to pull the data and write it.
*************************************************************************************/
val data = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(table)
.load()
.as[MyCassandraTable]
// write to hdfs
data
.write
.option("orc.compress", "snappy")
.mode(SaveMode.Overwrite)
.orc("/some/location/in/hdfs/")
}
}

Then, just execute it however you like – I tend to do it from a spark-submit command.

I’ve noticed this need for Cassandra to Hadoop loading across the community for some time now, and now that I’ve got a recommended method, I’m happy to share it with you all. If you have any questions, please drop them in a comment below this post. Cheers!

8 thoughts

  1. You are clearly explained about Versions of Spark, Cassandra, Java and Scala.This is worthfull to hadoop students if students read this they will clearly understand about versions of Spark, Cassandra, Java & Scala.

    here is another source where you will get more detailed information of Big data hadoop training.

    Liked by 1 person

    1. I believe it’s mandatory. You’ll use that to pass important configuration to spark regarding your Cassandra/Scylla cluster.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.