I will consider some sample code snippets, analyze the behavior of using the transformations above, and look at the different plans created by Spark.
Lets dive into some real world examples to help your Spark jobs run faster in your datalake.
Your client is reading a lot of small parquet files which is a total of 10.2 GB in size, merge them into a smaller numbers of files, and store them as parquet files with the same partitioning structure. How would you achieve this?
repartition() partitionBy()
#We’re reading Parquet files, with this in mind, we can assume
#the input split would be 128 MB.
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/")
#Lets verify how much partitions has been created
df.rdd.getNumPartitions()
82
#Now we would like to check how many distinct 'years' we have in
#this data set, to be able to foresee how many partitions will
#be created on disk later in this code.
df.select('year').distinct().count()
21
#Now we will perform a 'RepartitionByExpression' by defining
#the number of in memory partitions and by key.
repartExpr = df.repartition(90, 'year')
repartExpr.rdd.getNumPartitions()
90
repartExpr.write.mode('overwrite').partitionBy("year").parquet("s3://riyaad-sbx/spark-labs/repartitionByExpression/")
Here we are writing the ‘repartExpr’ dataframe after we’ve performed a ‘RepartitionByExpression’ using the PartitionBy transformation which will result in our directory structure to be written in Hive style on the column ‘year’. Lets take a look at the output file count:
$ aws s3 ls --summarize --human-readable s3://riyaad-sbx/spark-labs/repartitionByExpression/
PRE year=1995/
PRE year=1996/
PRE year=1997/
PRE year=1998/
PRE year=1999/
PRE year=2000/
PRE year=2001/
PRE year=2002/
PRE year=2003/
PRE year=2004/
PRE year=2005/
PRE year=2006/
PRE year=2007/
PRE year=2008/
PRE year=2009/
PRE year=2010/
PRE year=2011/
PRE year=2012/
PRE year=2013/
PRE year=2014/
PRE year=2015/
Total Objects: 21
Total Size: 10.2 GiB
Using the above code, the customer performs a repartition by expression to increase the partitions in memory to 90 and the expression by ‘year’.
Repartitioning the data set on a key, can assist with data skew. Lets say you’ve got an evenly distributed column within your data set, repartitioning on this column would help with data skew as it will distribute the partitions across your cluster equally. Beware though, if your column is not evenly distributed accross your data, this could cause additional skew resulting in straggler executors in your clusters where some tasks will write for a long time, while others will finish quickly.
Data will be shuffled across executors, meaning, the distinct “year” column will be in one partition. As we can see in the above code, we have 21 distinct years. Based on this information, we will know that 21 tasks will output data, with each year occupying one partition. The output partitions in this case, will be 21.
Why? When Spark uses the hashpartitioner, all rows are split up across partitions based on the hash of “expressions”. All rows where the expression evaluates to the same value are guaranteed to be in the same partition. If we perform a repartition by expression along with a custom partioner using the PartitionBy transformation, Spark will make use of the HashPartitioner.
The client’s business requirement has changed, and now requires that they partition their data in Hive style format based on the keys “marketplace” and “star_rating” in order for their analytics team to query the data more efficiently. Additionally, they need to output file per partition to avoid a small file problem down the line.
Lets take a look at the following sample code:
#We’re reading Parquet files, with this in mind, we can assume the input
#split would be 128 MB.
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/")
#Lets verify how much partitions has been created
df.rdd.getNumPartitions()
82
#Now we would like to check how many distinct 'marketplace' and 'star_rating'
#colums we have in this data set, to be able to foresee how many partitions
#will be created on disk later in this code.
df.select("marketplace").distinct().show()
+-----------+
|marketplace|
+-----------+
| DE|
| US|
| UK|
| FR|
| JP|
+-----------+
df.select("star_rating").distinct().show()
+-----------+
|star_rating|
+-----------+
| 1|
| 3|
| 4|
| 5|
| 2|
+-----------+
Lets go ahead and repartition the dataframe by expression now that we know the distinct values for “marketplace” and “star_rating” and then write the data in Hive style to S3:
df_repartition = df.repartition(28, 'marketplace', 'star_rating')
df_repartition.write.mode('overwrite')\
.partitionBy("marketplace", "star_rating")\
.parquet("s3://riyaad-sbx/spark-labs/repartitionByExpressionHash/")
Our output structure should look like (I’ve purposefully truncated the output):
PRE marketplace=DE/
star_rating=1/part-1
star_rating=2/part-1
star_rating=3/part-1
star_rating=4/part-1
star_rating=5/part-1
PRE marketplace=FR/
star_rating=1/part-1
star_rating=2/part-1
star_rating=3/part-1
star_rating=4/part-1
star_rating=5/part-1
PRE marketplace=JP/
star_rating=1/part-1
star_rating=2/part-1
star_rating=3/part-1
star_rating=4/part-1
star_rating=5/part-1
PRE marketplace=UK/
star_rating=1/part-1
star_rating=2/part-1
star_rating=3/part-1
star_rating=4/part-1
star_rating=5/part-1
PRE marketplace=US/
star_rating=1/part-1
star_rating=2/part-1
star_rating=3/part-1
star_rating=4/part-1
star_rating=5/part-1
In each star_rating partition, we will find one output partition as expected. In this example, the ‘marketplace’ and ‘star_rating’ rows have been split across partitions based on the repartition by expression and now reside on the same executor. When the write occurred, every executor? recieved its own partition. You would be able to use the Spark SQL tab in the SparkUI to confirm the output and behavior.
I hope this blog, and information was insightful.
References:
[1] Sample Datasets used: s3://amazon-reviews-pds/parquet/
[2] What is AWS Glue: https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html
Using GitHub Action to interact with Amazon Web Services
28 Jul 2023
Building DataLakes can become complicated when dealing with lots of data.
08 Mar 2023