Table of Contents
- How to Set Up PySpark 1.X
- How to Set Up PySpark 2.x
- Set Up PySpark on AWS Glue
- How to Load Data in PySpark
- Create a Glue DynamicFrame
- How to Write Data in PySpark
- How to Inspect Data in PySpark
- How to Analyze Content in PySpark
- How to Add, Remove, and Update Columns in PySpark
- How to Select and Modify Data in PySpark
- How to Group Data in PySpark
- How to Filter Data in PySpark
- How to Sort Data in PySpark
- How to Repartition Data in PySpark
- How to Perform Joins in PySpark
- How to Query Data in PySpark
How to Set Up PySpark 1.X
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
Create a SparkContext:
sc = SparkContext()
Create a SQLContext:
sc = SparkContext()
sql_context = SQLContext(sc)
Create a HiveContext:
sc = SparkContext()
hive_context = HiveContext(sc)
How to Set Up PySpark 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Set Up PySpark on AWS Glue
from pyspark.context import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate())
How to Load Data in PySpark
Create a DataFrame from RDD
Create a DataFrame using the .toDF() function:
population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]
rdd = spark.sparkContext.parallelize(population)
cols = ["state", "population"]
df = rdd.toDF(rdd_columns)
Create a DataFrame using the createDataFrame() function:
population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]
cols = ["state", "population"]
df = spark.createDataFrame(data=population, schema=cols)
Create a DataFrame using a combination of the createDataFrame() function and StructType schema:
population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]
pop_schema = StructType([
StructField("state", StringType(), True),
StructField("population", IntegerType(), True)])
df = spark.createDataFrame(data=population, schema=pop_schema)
Create a DataFrame from a Spark Data Source
Load a .csv file:
df = spark.read.csv("sport.csv", sep=";", header=True, inferSchema=True)
Read a .txt file:
df = spark.read.text("names.txt")
Read a .json file:
df = spark.read.json("fruits.json", format="json")
Read a .parquet file:
df = spark.read.load("stock_prices.parquet")
or:
df = spark.read.parquet("stock_prices.parquet")
Create a Glue DynamicFrame
dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")
spark_df = dfg.toDF()
Article continues below
Want to learn more? Check out some of our courses:
How to Write Data in PySpark
Write Data from a DataFrame in PySpark
df_modified.write.json("fruits_modified.jsonl", mode="overwrite")
Convert a DynamicFrame to a DataFrame and write data to AWS S3 files
dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")
Repartition into one partition and write:
df = dfg.toDF().repartition(1)
df.write.parquet("s3://glue-sample-target/outputdir/dfg")
Repartition by a column and write:
dfg.toDF().write.parquet("s3://glue-sample-target/outputdir/dfg", partitionBy=["example_column"])
Convert a DataFrame to a DynamicFrame and Write Data to AWS S3 Files
dfg = DynamicFrame.fromDF(df, glueContext, "dfg")
glueContext.write_dynamic_frame.from_options(
frame=dfg,
connection_type="s3",
connection_options={"path": "s3://glue-sample-target/outputdir/dfg"},
format="parquet")
How to Inspect Data in PySpark
Display Content
Display DataFrame content:
df.show()
Display DataFrame schema:
df.schema()
Display DataFrame as a Pandas DataFrame:
df.toPandas()
Return DataFrame columns:
df.columns
Return the first n rows of a DataFrame:
df.head(n)
Return the first row of a DataFrame:
df.first()
Display DynamicFrame schema:
dfg.printSchema()
Display DynamicFrame content by converting it to a DataFrame:
dfg.toDF().show()
How to Analyze Content in PySpark
Analyze a DataFrame
Generate a basic statistical analysis of a DataFrame:
df.describe.show()
Count the number of rows inside a DataFrame:
df.count()
Count the number of distinct rows:
df.distinct().count()
Print the logical and physical plans:
df.explain()
How to Add, Remove, and Update Columns in PySpark
Add Columns
Add columns with Spark native functions:
import pyspark.sql.functions as f
new_df = df.withColumn("column_3_multiplied", 3 * f.col("column_3_original"))
Add columns with user defined functions (UDFs):
import pyspark.sql.functions as f
from psyspark.sql.types import *
def example_func(filter_value):
if values >= 5:
return "enough free spots"
else:
return "not enough free spots"
my_udf = f.udf(example_func, StringType())
cinema_tickets = cinema.withColumn("free_spots", my_udf("spots") )
Remove Columns
Remove columns using column names:
sports = df.drop("football", "basketball")
Remove columns using chaining:
sports = sports.drop(sports.football).drop(sports.basketball)
Modify Columns
Rename columns:
df = df.withColumnRenamed("basketball", "BASKETBALL")
Remove duplicates based on data in a column:
df.drop_duplicates(subset=["basketball"])
Remove rows with missing values based on columns in the DataFrame:
df.na.drop(subset=["basketball", "football"])
Impute missing data:
df.na.fill(25)
How to Select and Modify Data in PySpark
Select Data
Select a single column:
df.select("basketball")
Select multiple columns:
df.select("basketball", "football")
Select a filtered version of a column:
df.select(df["goals"] >= 2)
Select a modified version of a column:
df.select(df["goals"] + 1)
Select Data with Conditional Arguments in PySpark
Select using a when-otherwise clause:
df.select("goals", f.when(df.goals == 0, "boring").otherwise("interesting"))
Select using like:
df.select("sport", df.sport.like("basketball"))
Select using between:
df.select(df.goals.between(1, 3))
Select using startswith or endswith:
df.select("sports", df.players.startwith("B"))
df.select(df.players.endswith("s"))
Select a substring (substr):
df.select(df.players.substr(1, 4).alias("nickname"))
How to Group Data in PySpark
Group data:
df.groupby("players").count().show()
Group and aggregate data:
df.groupby("players").agg(spark_max("goals"), spark_min("goals"), spark_sum("goals").alias("total_goal_num")).show()
How to Filter Data in PySpark
df.filter(df["goals"] > 3)
How to Sort Data in PySpark
df.sort("goals", ascending=True).collect()
or
df.sort(df.goals.asc()).collect()
or
df.orderBy(["goals"], ascending = [0,1]).collect()
How to Repartition Data in PySpark
Create multiple partitions:
df.repartition(5).rdd.getNumPartitions()
Create a single partition:
df.coalesce(1).rdd.getNumPartitions()
How to Perform Joins in PySpark
Perform an inner join:
df = df_1.join(df_2, on=["key"], how="inner")
Perform an inner join with conditions:
df = df_1.join(df_2, df_1.key < df_2.key, how="inner")
Perform an outer join:
df = df_1.join(df_2, on=["key"], how="outer")
Perform a left join:
df = df_1.join(df_2, on=["key"], how="left")
Perform a right join:
df = df_1.join(df_2, on=["key"], how="right")
Perform a left semi join:
df = df_1.join(df_2, on=["key"], how="left_semi")
Perform a left anti join:
df = df_1.join(df_2, on=["key"], how="left_anti")
How to Query Data in PySpark
people.createOrReplaceTempView('people')
spark.sql(
"""
SELECT *
FROM people
INNER JOIN places
ON people.city = LOWER(places.location)
"""
).show()