Introduction

Since DataFrames are comprised of named columns, in Spark there are many options for performing operations on individual or multiple columns. This section will introduce converting columns to a different data type, adding calculate columns, renaming columns, and dropping columns from a DataFrame.

Note

This section uses a PySpark and Spark Scala DataFrame called df created from the Parquet files found in at the GitHub repository https://github.com/Teradata/kylo/tree/master/samples/sample-data. In Section 1.3 we demonstrated copying this data to HDFS.

df = spark.read.parquet("hdfs://user/your_user_name/sample-data/parquet/userdata1.parquet")

Columns

As discussed before, columns in Spark DataFrames are always named. To list the column names of a DataFrame use the columns property. In PySpark, df.columns returns a list of all the column names. In Spark Scala, df.columns returns an array of column names.

PySpark output is as follows:

Out[1]: 
['registration_dttm',
 'id',
 'first_name',
 'last_name',
 'email',
 'gender',
 'ip_address',
 'cc',
 'country',
 'birthdate',
 'salary',
 'title',
 'comments'] 

Spark Scala output is as follows:

res1: Array[String] = Array(registration_dttm, id, first_name, last_name, email, gender, ip_address, cc, country, birthdate, salary, title, comments) 

And since the output of df.columns is a normal Python list or Scala array, all of their built-in methods and operations apply to the result.

For example, in PySpark we can select one element or many from the list of DataFrame columns:

df.columns[2]
df.columns[2:5]
Out[2]: 'first_name'
Out[3]: ['first_name', 'last_name', 'email']

In Scala, use the slice() method to extract elements of an array by index:

df.columns(1)
df.columns.slice(1, 4)
Res2: String = id
Res3: Array[String] = Array(id, first_name, last_name)

Note

In both PySpark and Spark Scala, df.columns is equivalent to returning the names from the schema of a DataFrame.

df.schema.names

Column Data Types

So far we have seen two ways to return information about the data types of the columns in a DataFrame: df.printSchema() and df.schema. A third option is to call the dtypes property on a DataFrame. It returns a Python list or a Scala array of tuples of the column name and data type of each column.

The PySpark and Spark Scala code is as follows:

df.dtypes

PySpark output is as follows:

Out[4]: 
[('registration_dttm', 'timestamp'),
 ('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('email', 'string'),
 ('gender', 'string'),
 ('ip_address', 'string'),
 ('cc', 'string'),
 ('country', 'string'),
 ('birthdate', 'string'),
 ('salary', 'double'),
 ('title', 'string'),
 ('comments', 'string')]

Spark Scala output is as follows:

Res4: Array[(String, String)] = Array((registration_dttm,StringType), (id,LongType), (first_name,StringType), (last_name,StringType), (email,StringType), (gender,StringType), (ip_address,StringType), (cc,LongType), (country,StringType), (birthdate,StringType), (salary,DoubleType), (title,StringType), (comments,StringType)) 

Understanding Data Types

Now that we have covered multiple ways to view the data types of columns in a DataFrame, let’s explore how to convert a columns data type from one data type to another.

In Spark, converting the data type of a column uses the cast() method. The cast() method is called on a specific DataFrame column and accepts two different classes of parameters that correspond to a particular data type: string or DataType.

If choosing a string then cast() accepts: string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.

If choosing a DataType then cast() supports: StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType, DateType, TimestampType.

When specifying the DataType, be sure to import the classes before converting the data type. In PySpark: from pyspark.sql.types import * and in Spark Scala import org.apache.spark.sql.types._

Before we convert the data types of columns, let’s display what an integer column looks like. In this PySpark example, we select an integer column and use the take() method to display the top five rows as Spark’s Row object. Notice we aren’t casting columns yet.

PySpark code is as follows:

df.select(df.id).take(5)

Running the preceding code displays the following:

Out[5]: [Row(id=1), Row(id=2), Row(id=3), Row(id=4), Row(id=5)]

Notice that the “id” column are integers with no quotes. When we convert this integer column to a string the output will include quotes.

PySpark code is as follows:

string_id_df = df.select(df.id.cast("string"))
string_id_df.take(5)

Running the preceding code displays the following:

Out[6]: [Row(id='1'), Row(id='2'), Row(id='3'), Row(id='4'), Row(id='5')]

Now the values are encompassed by quotes which means they are now strings. We can check this by calling the schema method on our new DataFrame.

PySpark code is as follows:

string_id_df.schema

Running the preceding code displays the following:

Out[7]: StructType(List(StructField(id,StringType,true)))

Exercise 15: Change the data type of a DataFrame column

In this exercise the column has a type of timestamp. In computers and programming languages, dates and datetimes are actually just numbers. It is called “unix time” and it is the number of seconds since January 1, 1970 not counting leap seconds. We will convert this timestamp column into a number to confirm that dates are actually numbers.

  1. In Scala, select the “registration_dttm” column which has a data type of timestamp. Then print the first three rows and display the schema with the following code:
val reg_df = df.select(df("registration_dttm"))
reg_df.show(3)
reg_df.printSchema
+-------------------+
|  registration_dttm|
+-------------------+
|2016-02-03 07:55:29|
|2016-02-03 17:04:03|
|2016-02-03 01:09:31|
+-------------------+
only showing top 3 rows

root
 |-- registration_dttm: timestamp (nullable = true)

reg_df: org.apache.spark.sql.DataFrame = [registration_dttm: timestamp]
  1. Convert the column “registration_dttm” from timestamp to long (which is an integer with more available digits) and display the results with the following code:
import spark.implicits._

val long_reg_df = df.select($"registration_dttm".cast("long"))
long_reg_df.show(3)
long_reg_df.printSchema
+-----------------+
|registration_dttm|
+-----------------+
|       1454486129|
|       1454519043|
|       1454461771|
+-----------------+
only showing top 3 rows

root
 |-- registration_dttm: long (nullable = true)

long_reg_df: org.apache.spark.sql.DataFrame = [registration_dttm: bigint]

Casting the data type from one to another is a powerful operation. The exercise showed the fundamentals of how to convert a DataFrames column from one data type to another. 


At the same time we are casting the data type we can also change the column name from one name to another using the alias() method. It takes one parameter and that is the new column name to change to. The alias() method can be called directly on the column or after casting. This makes it very easy to change the column type and column name all in one operation.

The following PySpark code converts the “id" column to a string and in the same operation changes the name of the column to "new_id":

df.select(df.id.cast("string").alias('new_id')).show(5)
+------+
|new_id|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
+------+
only showing top 5 rows

The following Spark Scala code converts the "registration_dttm" column to a data type of long and then changes the column name to "seconds_since_1970".

df.select($"registration_dttm".cast("long").alias("seconds_since_1970")).show(5)
+------------------+
|seconds_since_1970|
+------------------+
|        1454486129|
|        1454519043|
|        1454461771|
|        1454459781|
|        1454475931|
+------------------+
only showing top 5 rows

Drop Columns

The operation of removing columns from a DataFrame is very easy: call the drop() method. The preferred parameters to pass to drop() are the column name strings you would like to remove. If there is more than one column name, separate each with a comma.

The following PySpark and Scala code removes single and multiple columns from a DataFrame:

df.drop("ip_address")
df.drop("comments", "registration_dttm")

When removing more than one column from a DataFrame, the drop() method does accept column names in the form of attribute or index. However, to remove more than one column at a time, while selecting column by attribute or index, chain multiple drop() methods together.

In PySpark, chain multiple drop() methods together when selecting columns by index or attribute with the following code:

df.drop(df.first_name)
df.drop(df.first_name).drop(df.title)

In Spark Scala, chain multiple drop() methods together when selecting column by index or the $-notation with the following code:

df.drop($"salary")
df.drop($"comments").drop($"salary")

Rename Columns

To rename a column on an existing DataFrame use the withColumnRenamed() method. It only takes two string parameters. The first is the existing column name that you would like to rename. And the second string is the new column name. 

The following PySpark code prints the columns of a DataFrame then renames one of the columns:

print(df.columns)
df_renamed = df.withColumnRenamed("title", "role")
print(df_renamed.columns)

The columns are printed again to show that the column name was changed as shown in the following output:

['registration_dttm', 'id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'cc', 'country', 'birthdate', 'salary', 'title', 'comments']
['registration_dttm', 'id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'cc', 'country', 'birthdate', 'salary', 'role', 'comments']

The following Spark Scala code prints the columns of a DataFrame then renames one of the columns. The columns are printed again to show that the column name was changed. In Scala, arrays cannot be printed like they can in Python. To display the contents of a Scala array call the mkString() method and give it a string parameter of a space. The mkString() parameter will print each element of the array and insert the string parameter between each element with the help of the following code:

println(df.columns.mkString(" "))
val df_renamed = df.withColumnRenamed("birthdate", "Date_of_Birth")
println(df_renamed.columns.mkString(" "))
registration_dttm id first_name last_name email gender ip_address cc country birthdate salary title comments
registration_dttm id first_name last_name email gender ip_address cc country Date_of_Birth salary title comments
df_renamed: org.apache.spark.sql.DataFrame = [registration_dttm: timestamp, id: int ... 11 more fields]

Add Columns

The Spark method withColumn() can accomplish several things. Primarily it is used to add new columns to a DataFrame. But it can also replace existing columns in a DataFrame. 

withColumn() accepts two parameters. The first parameter is a string of the column name to add or replace. The second parameter is an expression that constructs the data of the new column. 

Exercise 16: Adding a new number column

  1. In our DataFrame df there is an existing column “id” that has type of integer. Construct a new column by adding the integer ten to the original “id” column. The first parameter "append_id" is the name of the new column we are creating. Then in PySpark select the original column and add the integer ten. Display the contents to see the results of the calculated column with the following code:
df.withColumn("append_id", df["id"] + 10).select("id", "append_id").show(5)
+---+---------+
| id|append_id|
+---+---------+
|  1|       11|
|  2|       12|
|  3|       13|
|  4|       14|
|  5|       15|
+---+---------+
only showing top 5 rows
  1. In Spark Scala add a new column that produces the person’s monthly income. We can accomplish this by dividing the column "salary" by twelve. Select the original column and the new calculated column so we can see the values of both columns with the following code:
df.withColumn("monthly_salary", $"salary" / 12).select("first_name", "salary", "monthly_salary").show(5)
+----------+---------+------------------+
|first_name|   salary|    monthly_salary|
+----------+---------+------------------+
|    Amanda| 49756.53|         4146.3775|
|    Albert|150280.17|12523.347500000002|
|    Evelyn|144972.51|12081.042500000001|
|    Denise| 90263.05| 7521.920833333334|
|    Carlos|     null|              null|
+----------+---------+------------------+
only showing top 5 rows

Spark automatically takes care of null values.

  1. To round the values of the new calculated column "monthly_sales" wrap the expression in the Python or Scala’s round() function. The round() method’s first parameter is the expression and the second optional parameter is the number of decimal places to round to. If no digits are specified it rounds to the nearest whole integer. The following is in Spark Scala but the round() function works the same in PySpark:
import org.apache.spark.sql.functions.round

df.withColumn("monthly_salary", round($"salary" / 12, 2))
  .select("first_name", "salary", "monthly_salary")
  .show(5)

PySpark example:

from pyspark.sql.functions import round

df.withColumn("monthly_salary", round(df["salary"] / 12, 2))
  .select("first_name", "salary", "monthly_salary")
  .show(5)

Running the preceding code displays the following:

+----------+---------+--------------+
|first_name|   salary|monthly_salary|
+----------+---------+--------------+
|    Amanda| 49756.53|       4146.38|
|    Albert|150280.17|      12523.35|
|    Evelyn|144972.51|      12081.04|
|    Denise| 90263.05|       7521.92|
|    Carlos|     null|          null|
+----------+---------+--------------+
only showing top 5 rows

Creating new columns with a constant value

Creating a new column with a constant value (string, number, boolean) may seem straightforward. Most would think the following code would fill every cell in the column with the integer four:

df.withColumn("constant_value", 4).show(5)

But the preceding code will fail with an error because the withColumn() method only works on columns. The number 4 is not a column so it fails. The solution is to use the lit() function. The lit() function “creates a column literal value”. Since it is a function, it requires importing the class of functions. Following is the code for Scala Spark:

import org.apache.spark.sql.functions.lit
df.withColumn("constant_value", lit(4)).show(5)

The lit() function will create columns of integer, string or boolean. The following is the code for PySpark:

from pyspark.sql.functions import lit
server_df = df.withColumn("constant_string", lit("generated by server")).select("id", "constant_string")
server_df.show(5)
server_df.printSchema()

Running the preceding code displays the following:

+---+-------------------+
| id|    constant_string|
+---+-------------------+
|  1|generated by server|
|  2|generated by server|
|  3|generated by server|
|  4|generated by server|
|  5|generated by server|
+---+-------------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- constant_string: string (nullable = false)


Spark has many built-in functions that support a wide assortment of operations. Another way to select or chose a column that we have not covered before is the col() function. The col() function selects a DataFrame column just like PySpark’s df["column_name"] or Scala’s df("column_name") by taking a quoted string of the column name. But col() is a Spark function and must be imported first before use.

PySpark:  from pyspark.sql.functions import col

Spark Scala:  import org.apache.spark.sql.functions.col

Some other Spark functions are the upper() and lower() functions. They convert strings to all upper case or all lower case. Let’s see them in action.

Exercise 17: Adding a new string column

  1. In PySpark, use the col() method and the upper() method to add a new string column that capitalizes all the text as shown in the following code:
from pyspark.sql.functions import col, lower, upper

df.withColumn("FIRST_NAME_UPPER", upper(col("first_name"))).select("id", "first_name", "FIRST_NAME_UPPER").show(3)
+---+----------+----------------+
| id|first_name|FIRST_NAME_UPPER|
+---+----------+----------------+
|  1|    Amanda|          AMANDA|
|  2|    Albert|          ALBERT|
|  3|    Evelyn|          EVELYN|
+---+----------+----------------+
only showing top 3 rows
  1. In Scala, use the col() and the lower() methods to create a new string column with all lowercase letters as shown in the following code:
import spark.implicits._
import org.apache.spark.sql.functions.{col, lower, upper}

df.withColumn("last_name_lower", lower(col("last_name"))).select("id", "last_name", "last_name_lower").show(3)
+---+---------+---------------+
| id|last_name|last_name_lower|
+---+---------+---------------+
|  1|   Jordan|         jordan|
|  2|  Freeman|        freeman|
|  3|   Morgan|         morgan|
+---+---------+---------------+
only showing top 3 rows

Most times when creating a new column or replacing an existing column the column will also be converted to a new data type. We can call cast() inside withColumn() to create or replace a column while also converting it to a new data type. Remember, the cast() method supports specifying the data type either as a string or a DataType. 

If choosing a string then cast() accepts: string, boolean, byte, short, int, long, float, double, decimal, date, timestamp

If choosing a DataType then cast() supports: StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType, DateType, TimestampType.

Exercise 18: Creating a new column and casting the data type

  1. In PySpark, create a new column "registration_date" that is a date column from the timestamp column "registration_dttm" with the following code:
from pyspark.sql.types import DateType

registration_df = df.withColumn("registration_date", df.registration_dttm.cast(DateType())).select("id", "registration_dttm", "registration_date")

registration_df.show(3)
registration_df.printSchema()
+---+-------------------+-----------------+
| id|  registration_dttm|registration_date|
+---+-------------------+-----------------+
|  1|2016-02-03 07:55:29|       2016-02-03|
|  2|2016-02-03 17:04:03|       2016-02-03|
|  3|2016-02-03 01:09:31|       2016-02-03|
+---+-------------------+-----------------+
only showing top 3 rows

root
 |-- id: integer (nullable = true)
 |-- registration_dttm: timestamp (nullable = true)
 |-- registration_date: date (nullable = true) 
  1. The withColumn() method is also a great way to change the column data type of an existing column. In the first parameter specify an existing column name. Then any expression or cast will replace the original column. Use the following Spark Scala code:
import spark.implicits._
import org.apache.spark.sql.types.DateType

val new_df = df.withColumn("registration_dttm", $"registration_dttm".cast(DateType))
new_df.select("registration_dttm", "first_name", "last_name").show(3)

new_df.printSchema

In this example we replace and convert the registration data time column into just a date data type instead of adding a column and we get the following output:

+-----------------+----------+---------+
|registration_dttm|first_name|last_name|
+-----------------+----------+---------+
|       2016-02-03|    Amanda|   Jordan|
|       2016-02-03|    Albert|  Freeman|
|       2016-02-03|    Evelyn|   Morgan|
+-----------------+----------+---------+
only showing top 3 rows

root
 |-- registration_dttm: date (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)

import spark.implicits._
import org.apache.spark.sql.types.DateType
new_df: org.apache.spark.sql.DataFrame = [registration_dttm: date, id: int ... 11 more fields]

This exercise showed combining operations of adding a calculated column with the withColumn() method and casting the data type at the same time.

Implications of Adding Columns

In the following Spark Scala (or equivalent PySpark) operation why does the operation fail with the error org.apache.spark.sql.AnalysisException: ?

import spark.implicits._

df.withColumn("monthly_salary", $"salary" / 12)
  .select(df("first_name"), df("salary"), df("monthly_salary"))
  .show(5)

The operation is purportedly adding a calculated column “monthly_salary” and then selecting three columns.

Still do not know? Let’s add a version that works so that you can see the difference:

import spark.implicits._

df.withColumn("monthly_salary", $"salary" / 12)
  .select("first_name", "salary", "monthly_salary")
  .show(5)

The reason the first example fails is because of df("monthly_salary"). "monthly_sales” is a new calculated column and doesn’t exist in the original df DataFrame. To add a new column and select it all in the same operation, you must select the column using its column name in quotes or with the col() method.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.