Pyspark Unions Revisited

A “notebook” format post on using unions with pyspark.

! pip install pyspark --upgrade

Spark Dataframe “union” Revisted

As part of our customer engagement, we are involved in building an ETL pipeline to handle TeraBytes(~4TB) of data.

Long story in short, we had to build a pipeline which has to handle TeraBytes of data dumped from traditional SQL database. The catch is the duplicates of records due to some historical reasons in the upstream. The requirement is to build a pipeline that can take snapshots of deltas with frequecy of 3 weeks or less and apply to the previous dump.

We have to deal with the deltas which comprises of new records & modified records called delta drops and deletion of records called delete drops.

So when every ingestion delta drop comes into S3 we are supposed to compact the delatas with the old dump.

  • First make sure the old dump tables and the delta tables are having same schema (i.e same set of columns), if not apply schema correction basically adding the missing columns with NULL values
  • There may be multiple ingestion drops waiting to be compacted
  • So we auto crawled all the delta drops and created a dictionary of tables prefix path to list of tables (eg {s3_bucket/path/to/delta1/ : [table_1, table_2], s3_bucket/path/to/delta2/ : [table_2, table_3]}), that is find all tables under given prefix drop path.
  • Now reverse the dictionary such that given table name it should give the prefixes path eg: {table2 : [s3_bucket/path/to/delta1/, s3_bucket/path/to/delta2/]}, this helps us to iterate through the list of all avaiable tables and combine all the tables across the drops as one dataframe
  • Now consider each table from the previous dump and current snapshot drop and do the schema correction
  • Delta Apply Stage : Use row_number SQL operation along with window patitionby opeartion over the primary column and order(desc) by the date, to take the latest record. Eg: https://stackoverflow.com/questions/45513959/pyspark-get-row-number-for-each-row-in-a-group?rq=1
  • Delete Apply Stage : Do a left outer join and filter out null
  • Store the compacted dataframe/table to a new location

Now you see how union operation plays a major role in our compaction stage.

What is our general assumption on Dataframe unions, if we union two dataframes of same columns, it should appends the second dataframee/table into first dataframe/table and create a new dataframe.

What if the columns are not in order on both the datframes?

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder. \
            master("local[4]"). \
            appName("shabda").getOrCreate()
sc = spark.sparkContext
spark
<div>
    <p><b>SparkSession - in-memory</b></p>

    <div>
        <p><b>SparkContext</b></p>

        <p><a href="http://192.168.0.10:4040">Spark UI</a></p>

        <dl>
          <dt>Version</dt>
            <dd><code>v2.2.1</code></dd>
          <dt>Master</dt>
            <dd><code>local[4]</code></dd>
          <dt>AppName</dt>
            <dd><code>shabda</code></dd>
        </dl>
    </div>

</div>

Let’s create two dataframes with two columns

df_1 = spark.createDataFrame([['a',1],['a', 2]], ['string_col', 'int_col'])
df_2 = spark.createDataFrame([[2,'b'], [1, 'b']], ['int_col', 'string_col'])
df_1.show(), df_2.show()
+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
+----------+-------+

+-------+----------+
|int_col|string_col|
+-------+----------+
|      2|         b|
|      1|         b|
+-------+----------+






(None, None)
df_3 = df_1.union(df_2)
df_3.show()
+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
|         2|      b|
|         1|      b|
+----------+-------+
assert sorted(df_1.columns) == sorted(df_2.columns)
df_3 = df_1.select(*df_1.columns).union(df_2.select(*df_1.columns))
df_3.show()
+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
|         b|      2|
|         b|      1|
+----------+-------+

As you can see when column orders are not in sync between dataframes, union operation can mess up the data!

Fix is to do a select on columns from one of the dataframe/sorted one on both the dataframes.

Logic behind the picking up the latest record in our union-ed dataframe which is a union of delta snapshot and the previous compacted data. Consider df_1 to be union-ed dataframe.

df_1 = spark.createDataFrame([['A',2000],['A',2002], ['A',2007], ['B',1999], ['B',2015]], ['Group', 'Date'])
df_1.show()
+-----+----+
|Group|Date|
+-----+----+
|    A|2000|
|    A|2002|
|    A|2007|
|    B|1999|
|    B|2015|
+-----+----+
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
df_final = df_1.withColumn("rownum", row_number().over(Window.partitionBy("Group").orderBy(desc("Date"))))
df_final.show()
+-----+----+------+
|Group|Date|rownum|
+-----+----+------+
|    B|2015|     1|
|    B|1999|     2|
|    A|2007|     1|
|    A|2002|     2|
|    A|2000|     3|
+-----+----+------+
df_final = df_1.withColumn("rownum", row_number().over(Window.partitionBy("Group").orderBy(desc("Date")))).filter("rownum ==1").drop("rownum")
df_final.show()
+-----+----+
|Group|Date|
+-----+----+
|    B|2015|
|    A|2007|
+-----+----+
Mageswaran Dhandapani avatar
About Mageswaran Dhandapani, "Mages"
Data Science and Engineering team.