May 13, 2023

pyspark Union() vs UnionAll() Vs unionByName()


  • union() method of the DataFrame is used to merge two DataFrames of the same structure/schema. 

    • DataFrame union() method merges two DataFrames and returns the new DataFrame with all rows from two Dataframes regardless of duplicate data.

    • It merges records irrespective of column names

    • It resolves by position not by name

    • #Merge with duplicates (by default)

      • unionDF = df.union(df2)

      • unionDF.show(truncate=False)

    • #Merge without duplicates

      • unionDF = df.union(df2).distinct()

      • unionDF.show(truncate=False)

  • unionAll() is deprecated since Spark “2.0.0” version and replaced with union().

    • Below unionAll yields same as Union (as unionAll is deprecated)

    • unionDF = df.unionAll(df2)

    • unionDF.show(truncate=False)

  • unionByName()

    • It resolves by name not by position

    • It also should have same no of columns on both data frames

    • df1 = spark.createDataFrame([[1,1,1],[2,2,2],[3,3,3]], ['col1', 'col2', 'col3'])

    • df2 = spark.createDataFrame([[33,33,33],[11,11,11],[22,22,22]], ['col3', 'col1', 'col2'])

    • df3 = df1.unionByName(df2)

    • >>> df3.show()




Example unionByName()

data1 = [("James","Smith","USA","CA"),

 ("Michael","Rose","USA","NY"),

 ("Robert","William","USA","CA"),

 ("Maria","Jones","USA","FL")

    ]


columns1 = ["firstname","lastname","country","state"]

df1 = spark.createDataFrame(data = data1, schema = columns1)

df1.show(truncate=False)


>>> df1.show()

+---------+--------+-------+-----+

|firstname|lastname|country|state|

+---------+--------+-------+-----+

|    James|   Smith|    USA|   CA|

|  Michael|    Rose|    USA|   NY|

|   Robert| William|    USA|   CA|

|    Maria|   Jones|    USA|   FL|

+---------+--------+-------+-----+


#Age is additional columns

data2 = [("Anand","Ranganath","India","Tamilnadu", "50"),

 ("Chiranjeevi","Konidela","India","AP", "60"),

    ]


columns2 = ["firstname","lastname","country","state", "age"]

df2 = spark.createDataFrame(data = data2, schema = columns2)

df2.show(truncate=False)


>>> df2.show()

+-----------+---------+-------+---------+---+

|  firstname| lastname|country|    state|age|

+-----------+---------+-------+---------+---+

|      Anand|Ranganath|  India|Tamilnadu| 50|

|Chiranjeevi| Konidela|  India|       AP| 60|

+-----------+---------+-------+---------+---+


#Add df2 additional column(s) [age] & make its a null in df1

for col in df2.columns:

    if col not in df1.columns:

        df1 = df1.withColumn(col, lit(None))



### If order of columns is different in df1 & df2 then 

df1.union(df2.select(df1.columns))


>>> df1.union(df2).show()

+-----------+---------+-------+---------+----+

|  firstname| lastname|country|    state| age|

+-----------+---------+-------+---------+----+

|      James|    Smith|    USA|       CA|null|

|    Michael|     Rose|    USA|       NY|null|

|     Robert|  William|    USA|       CA|null|

|      Maria|    Jones|    USA|       FL|null|

|      Anand|Ranganath|  India|Tamilnadu|  50|

|Chiranjeevi| Konidela|  India|       AP|  60|

+-----------+---------+-------+---------+----+


>>> df1.unionByName(df2, allowMissingColumns = True).show()

+-----------+---------+-------+---------+----+

|  firstname| lastname|country|    state| age|

+-----------+---------+-------+---------+----+

|      James|    Smith|    USA|       CA|null|

|    Michael|     Rose|    USA|       NY|null|

|     Robert|  William|    USA|       CA|null|

|      Maria|    Jones|    USA|       FL|null|

|      Anand|Ranganath|  India|Tamilnadu|  50|

|Chiranjeevi| Konidela|  India|       AP|  60|

+-----------+---------+-------+---------+----+



No comments:

Post a Comment