
Cassandra is a great open-source solution for accessing data at web scale, thanks in no small part to its low-latency performance. And if you’re a power user of Cassandra, there’s a high probability you’ll want to analyze the data it contains to create reports, apply machine learning, or just do some good old fashioned digging.
However, Cassandra can prove difficult to use as an analytical warehouse, especially if you’re using it to serve data in production around the clock. But one approach you can take is quite simple: copy the data to Hadoop (HDFS).
But how? There are a few feasible approaches – but the easiest to implement and scale, in my experience, is using an Apache Spark application using YARN on your Hadoop cluster to pull the data from Cassandra and into HDFS.
And it all hinges on using a piece of software from DataStax, the distributor of Cassandra, called the Spark Cassandra Connector. It’s a thing of beauty, so my hat is off to the engineering team that made it available to the community. With that said…
What is the DataStax Spark Cassandra Connector?
To quote their very-detailed documentation:
This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.
https://github.com/datastax/spark-cassandra-connector
Since our use case is to offload Cassandra data to HDFS, we can directly benefit from “exposing Cassandra tables as Spark RDDs”, since we can write RDDs quite trivially to HDFS.
What Versions of Spark, Cassandra, Java and Scala are supported?
It’s worth noting the most up-to-date version-to-version compatibility chart that exists for the connector and its support for Spark, Cassandra, Java, and Scala.
Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
---|---|---|---|---|---|
2.4 | 2.4 | 2.1.5*, 2.2, 3.0 | 3.0 | 8 | 2.11 |
2.3 | 2.3 | 2.1.5*, 2.2, 3.0 | 3.0 | 8 | 2.11 |
2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.0 | 3.0 | 8 | 2.10, 2.11 |
1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 |
So, for the sake of this example, let’s say we’re going with the row in bold, which utilizes version 2.0 of the connector.
How to Use the Connector in a Spark App
Start by importing the connector in your build.sbt file in your project:
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.9"
Then, simply utilize it using the spark.read() API, as below:
import com.datastax.spark.connector.cql.CassandraConnectorConf | |
import org.apache.spark.sql.{SaveMode, SparkSession} | |
import org.apache.spark.sql.cassandra._ | |
object CassandraLoader extends Serializable { | |
/** Representative of the some_keyspace.some_table table. */ | |
case class MyCassandraTable(user_id: String, `type`: Int, key: String, value: String) | |
def main(args: Array[String]) { // scalastyle:off method.length | |
/************************************************************************************** | |
* INITIATE SPARK SESSION | |
* This session will be used to ingest data from a Cassandra cluster. | |
*************************************************************************************/ | |
val spark = SparkSession | |
.builder() | |
.config("hive.merge.orcfile.stripe.level", "false") | |
.appName("Cassandra Data Loader") | |
.enableHiveSupport() | |
.getOrCreate() | |
// Implicits Allow us to Use .as[CaseClass] | |
import spark.implicits._ | |
/************************************************************************************** | |
* SETUP CASSANDRA CONNECTION | |
* These settings determine which environment, keyspace and table to download. | |
*************************************************************************************/ | |
val user = Map("spark.cassandra.auth.username" -> "some_username") | |
val pwd = Map("spark.cassandra.auth.password" -> "some_password") | |
// Setup an entrypoint into your cassandra cluster from spark | |
val hosts = "cassandra001.yourcompany.com,cassandra002.yourcompany.com,cassandra003.yourcompany.com" | |
val port = "9042" | |
// Set Cassandra Connection Configuration in Spark Session | |
spark.setCassandraConf( | |
CassandraConnectorConf.KeepAliveMillisParam.option(1000) ++ | |
CassandraConnectorConf.ConnectionHostParam.option(hosts) ++ | |
CassandraConnectorConf.ReadTimeoutParam.option(240000) ++ | |
CassandraConnectorConf.ConnectionPortParam.option(port) ++ | |
user ++ pwd) | |
// Imply which keyspace.table to consume from Cassandra | |
val table = Map("keyspace" -> "some_keyspace", "table" -> "some_table_in_that_keyspace") | |
/************************************************************************************** | |
* CONSUME DATA FROM CASSANDRA | |
* Use the connector, via the format() method, to pull the data and write it. | |
*************************************************************************************/ | |
val data = spark | |
.read | |
.format("org.apache.spark.sql.cassandra") | |
.options(table) | |
.load() | |
.as[MyCassandraTable] | |
// write to hdfs | |
data | |
.write | |
.option("orc.compress", "snappy") | |
.mode(SaveMode.Overwrite) | |
.orc("/some/location/in/hdfs/") | |
} | |
} |
Then, just execute it however you like – I tend to do it from a spark-submit command.
I’ve noticed this need for Cassandra to Hadoop loading across the community for some time now, and now that I’ve got a recommended method, I’m happy to share it with you all. If you have any questions, please drop them in a comment below this post. Cheers!
You are clearly explained about Versions of Spark, Cassandra, Java and Scala.This is worthfull to hadoop students if students read this they will clearly understand about versions of Spark, Cassandra, Java & Scala.
here is another source where you will get more detailed information of Big data hadoop training.
LikeLiked by 1 person
Thanks for sharing more info , but spark taking lot time to shuffle data . i.e. 2hrs+ for 2 billion records… can you provide some clue to tune it ? more details are https://stackoverflow.com/questions/57684972/how-to-improve-performance-my-spark-job-here-to-load-data-into-cassandra-table
LikeLike
There actually should not be any shuffling if you simply read/write without any reduce/grouping operations.
https://imgur.com/Xlp3K4F
LikeLike
Thanks for sharing,
I’m writting a content about hadoop cluster introduction.
Please feel free to look around :
https://bayudwiyansatria.com/tutorial/setup-and-configure-cluster-node-hadoop-installation/
I hope u don’t mind about link that i post. Thank you very much.
Regards.
LikeLike
No problem. Thanks for visiting and sharing.
LikeLike
// Set Cassandra Connection Configuration in Spark Session
This Step i didn’t get …. is this mandatory
LikeLike
I believe it’s mandatory. You’ll use that to pass important configuration to spark regarding your Cassandra/Scylla cluster.
LikeLike