May 13, 2023

Pyspark SaveAsTable Vs InsertInto

 

insertInto

saveAsTable

Table must exist before writing

Not required to exist

It ignores column names, it goes by position based approach

column based resolution

It requires schema of the dataframe should be same as schema of table

Append mode: It requires schema of the dataframe should be same as schema of table

Overwrite mode: Same schema is not required

Overwrite mode: will truncate the table & inserts the data

Overwrite mode: will drop table, re-create table & then insert data

Does not support BucketBy

Support BucketBy

If table is partitioned, partitionBy should not be used

If table is partitioned, partitionBy must be used


Example (Overwrite an existing partition)

Here we are only trying to update/overwrite 'category' column for US records only (don't overwrite other country records i.e., IN/PL/BH/UK)


from pyspark.sql.functions import *


#need to set, else we get error as below:

#org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column.

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")


### txnrecords_part_all is partitioned table with country as partitioned column

df1=spark.sql("select * from test.txnrecords_part_all")

df1.groupBy('country').agg(count('country')).show(truncate=False)


#Select only records from US (as we want to update US records only) -> partitioned by country

df1=spark.sql("select * from test.txnrecords_part_all where country='US'")


#updating category column

df1 = df1.withColumn('category', concat(col('category'), lit('111')))

df1.show(truncate=False)


#insertInto

#Table must exist before writing

#It ignores column names, it goes by position

#Does not support Bucketing

#If table is partitioned, you should not use partitionBy

df1.write.mode("overwrite").insertInto("test.txnrecords_part_all", overwrite=True)


#saveAsTable

#Table not required to exist before writing - table is not present txnrecords_part_all_sat

#column based approach (not by position)

#It support Bucketing

#If table is already partitioned, so you must use partitionBy

#companiesDF.write.mode("overwrite").partitionBy("country").saveAsTable(targetTable)

#df1.write.mode("overwrite").partitionBy("country").saveAsTable("test.txnrecords_part_all_sat")




Before

#####

hive> select * from txnrecords_part_all where country='US' limit 5;

OK

0       06-26-2011      4007024 40.33   Exercise & Fitness      Cardio Machine Accessories      Clarksville     Tennessee       credit  S       US

1       05-26-2011      4006742 198.44  Exercise & Fitness      Weightlifting Gloves    Long Beach      California      credit  S       US

2       06-01-2011      4009775 5.58    Exercise & Fitness      Weightlifting Machine Accessories       Anaheim California      credit  S       US

3       06-05-2011      4002199 198.19  Gymnastics      Gymnastics Rings        Milwaukee       Wisconsin       credit  S       US

4       12-17-2011      4002613 98.81   Team Sports     Field Hockey    Nashville       Tennessee       credit  S       US

Time taken: 1.023 seconds, Fetched: 5 row(s)

hive> select * from txnrecords_part_all where country='UK' limit 5;

OK

0       06-26-2011      4007024 40.33   Exercise & Fitness      Cardio Machine Accessories      Clarksville     Tennessee       credit  K       UK

1       05-26-2011      4006742 198.44  Exercise & Fitness      Weightlifting Gloves    Long Beach      California      credit  K       UK

2       06-01-2011      4009775 5.58    Exercise & Fitness      Weightlifting Machine Accessories       Anaheim California      credit  K       UK

3       06-05-2011      4002199 198.19  Gymnastics      Gymnastics Rings        Milwaukee       Wisconsin       credit  K       UK

4       12-17-2011      4002613 98.81   Team Sports     Field Hockey    Nashville       Tennessee       credit  K       UK

Time taken: 0.174 seconds, Fetched: 5 row(s)




After

#####

You observe only US partition records got updated as above (111), not UK partition records as below

hive> select * from txnrecords_part_all where country='US' limit 5;

OK

0       06-26-2011      4007024 40.33   Exercise & Fitness111   Cardio Machine Accessories      Clarksville     Tennessee       credit  S       US

1       05-26-2011      4006742 198.44  Exercise & Fitness111   Weightlifting Gloves    Long Beach      California      credit  S       US

2       06-01-2011      4009775 5.58    Exercise & Fitness111   Weightlifting Machine Accessories       Anaheim California      credit  S       US

3       06-05-2011      4002199 198.19  Gymnastics111   Gymnastics Rings        Milwaukee       Wisconsin       credit  S       US

4       12-17-2011      4002613 98.81   Team Sports111  Field Hockey    Nashville       Tennessee       credit  S       US

Time taken: 0.2 seconds, Fetched: 5 row(s)


hive> select * from txnrecords_part_all where country='UK' limit 5;

OK

0       06-26-2011      4007024 40.33   Exercise & Fitness      Cardio Machine Accessories      Clarksville     Tennessee       credit  K       UK

1       05-26-2011      4006742 198.44  Exercise & Fitness      Weightlifting Gloves    Long Beach      California      credit  K       UK

2       06-01-2011      4009775 5.58    Exercise & Fitness      Weightlifting Machine Accessories       Anaheim California      credit  K       UK

3       06-05-2011      4002199 198.19  Gymnastics      Gymnastics Rings        Milwaukee       Wisconsin       credit  K       UK

4       12-17-2011      4002613 98.81   Team Sports     Field Hockey    Nashville       Tennessee       credit  K       UK

Time taken: 0.168 seconds, Fetched: 5 row(s)


No comments:

Post a Comment