Schemas in Spark DataFrame
written by Zeyu Yan, Ph.D., Head of Data Science from Nvicta AI
Data Science in Drilling is a multi-episode series written by the technical team members in Nvicta AI. Nvicta AI is a startup company who helps drilling service companies increase their value offering by providing them with advanced AI and automation technologies and services. The goal of this Data Science in Drilling series is to provide both data engineers and drilling engineers an insight of the state-of-art techniques combining both drilling engineering and data science.
Today we will continue our tutorial series on PySpark.
As always, the first step is to import the necessary dependencies and create a Spark Session:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row
spark = SparkSession.builder.getOrCreate()
Create DataFrame with Schema
Let's first define some data:
data = [
("James", "", "Smith", "36636", "M", 3000),
("Michael", "Rose", "", "40288", "M", 4000),
("Robert", "", "Williams", "42114", "M", 4000),
("Maria", "Anne", "Jones", "39192", "F", 4000),
("Jen", "Mary", "Brown", "", "F", -1)
]
Although Spark is smart enought to infer the data types directly from the data, we do want to be able to set the schema ourselfves. Let's define the schema for the data as follows:
schema = StructType([
StructField("first_name", StringType(), True),
StructField("middle_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True)
])
For StructField, the first argument defines the name of the field, the second argument defines the type of the field and the last argument determines if this field can be null. Now we are ready to create a PySpark DataFrame using the data and schema:
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
The result of the printSchema function is:

Let's also take a look on the DataFrame created:
df.show(truncate=False)
The result is:

Use Nested Schemas
Take a look at the following data:
dataStruct = [
(("James", "", "Smith"), "36636", "M", "3000"),
(("Michael", "Rose", ""), "40288", "M", "4000"),
(("Robert", "", "Williams"), "42114", "M", "4000"),
(("Maria", "Anne", "Jones"), "39192", "F", "4000"),
(("Jen", "Mary", "Brown"), "", "F", "-1")
]
Obviously the name field is a tuple in this case, but we also want to define the data type for each field inside the tuple. In this case, we can use a nested schema defined as follows:
schemaStruct = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('dob', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', StringType(), True)
])
Let's create the DataFrame and take a look at both the schema and the DataFrame itself:
df = spark.createDataFrame(data=dataStruct, schema=schemaStruct)
df.printSchema()
df.show(truncate=False)
The results are:

Create DataFrame using Row Class
We have went over one example on how to create PySpark DataFrame in the previous episode. Here we provide another example. First let's create some data using Row class:
Person = Row("name", "lang", "state")
data = [
Person("Michael Jordan", ["Java", "Scala" ,"C++"], "CA"),
Person("Lebron James", ["Spark", "Java", "C++"], "NJ"),
Person("Kobe Bryant", ["CSharp" ,"VB"], "NV")
]
data
The result is:

Now let's create a DataFrame from the data:
df = spark.createDataFrame(data)
df.printSchema()
df.show()
The results are:

If we want different column names when creating the DataFrame, we can achieve this as follows:
# Change column names
columns = ["name", "languagesAtSchool", "currentState"]
df = spark.createDataFrame(data).toDF(*columns)
df.printSchema()
The result is:

Column Operations
Let's create a simple DataFrame:
data = [(100, 2, 1), (200, 3, 4), (300, 4, 4)]
df = spark.createDataFrame(data).toDF("col1", "col2", "col3")
df.show()
The result is:

Now we want the row sum of col1 and col2:
df.select(df.col1 + df.col2).show()
The result is:

We can do a series of mathematical operations:
df.select(df.col1 - df.col2).show()
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()
Now I want to compare every row of col2 to col3:
df.select(df.col2 > df.col3).show()
The result is a column of booleans:

This is how we can get the value of the first row in the result column:
df.select(df.col2 > df.col3).collect()[0]["(col2 = col3)"]
The result is:
True
Alias
When querying the DataFrame, we are able to rename the columns queried using the alias function. Let's first create a new DataFrame:
data = [
("James", "Bond" ,"100", None),
("Ann", "Varsa", "200", 'F'),
("Tom Cruise", "XXX", "400", ''),
("Tom Brand", None, "400", 'M')
]
columns = ["fname", "lname", "id", "gender"]
df = spark.createDataFrame(data, columns)
Now query the DataFrame and rename the fname and lname columns:
df.select(df.fname.alias("first_name"), df.lname.alias("last_name")).show()
The resulted DataFrame is:

Conclusions
In this article, we covered some more skills in PySpark. We will cover great details about Spark in this upcoming Spark tutorial series. Stay tuned!
Get in Touch
Thank you for reading! Please let us know if you like this series or if you have critiques. If this series was helpful to you, please follow us and share this series to your friends.
If you or your company needs any help on projects related to drilling automation and optimization, AI, and data science, please get in touch with us Nvicta AI. We are here to help. Cheers!
Comentarios