Apr 5, 2023

pyspark scd type2 with delta file

 from pyspark.sql.functions import *

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local')\
.appName('scdType2')\
.getOrCreate()
emp_df = spark.read.csv('C:\\Users\\Prabhath\\Downloads\\employee.csv', inferSchema=True, header=True,
ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
emp_df = emp_df.alias('emp_df')
emp_df.show()

emp_delta_df = spark.read.csv('C:\\Users\\Prabhath\\Downloads\\employee_delta.csv', inferSchema=True, header=True,
ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
emp_delta_df = emp_delta_df.alias('emp_delta_df')
emp_delta_df.show()

#in-active
emp_inactive = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "inner")\
.select("emp_df.*")\
.withColumn('is_active', lit(False))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', current_timestamp())
print('### inactive records - common in both - emp_df depreated, so take only those')
emp_inactive.show(truncate=False)

#active
emp_active = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "inner")\
.select("emp_delta_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### active records - common in both - emp_delta_df latest, so take only those')
emp_active.show(truncate=False)

#only left (no
emp_no_change_df = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "left_anti")\
.select("emp_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### not changed records - left anti - emp_df join emp_delta_df')
emp_no_change_df.show(truncate=False)


#only right (new)
emp_new_df = emp_delta_df.join(emp_df, emp_df.emp_id==emp_delta_df.emp_id, "left_anti")\
.select("emp_delta_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### new records - left anti - emp_delta_df join emp_df')
emp_new_df.show(truncate=False)

#union all
final_df = emp_inactive.union(emp_active).union(emp_no_change_df).union(emp_new_df).orderBy('emp_id', 'is_active')
final_df.show(truncate=False)


+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
|     1|    John|   Sydney|     35000|
|     2|   Peter|Melbourne|     45000|
|     3|     Sam|   Sydeny|     55000|
+------+--------+---------+----------+

+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
|     2|   Peter|Melbourne|     55000|
|     4|     Sam|   Sydeny|     42000|
+------+--------+---------+----------+

### inactive records - common in both - emp_df depreated, so take only those
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|emp_id|emp_name|emp_city |emp_salary|is_active|start_time             |end_time               |
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|2     |Peter   |Melbourne|45000     |false    |2023-04-05 21:28:26.006|2023-04-05 21:28:26.006|
+------+--------+---------+----------+---------+-----------------------+-----------------------+

### active records - common in both - emp_delta_df latest, so take only those
+------+--------+---------+----------+---------+----------------------+-------------------+
|emp_id|emp_name|emp_city |emp_salary|is_active|start_time            |end_time           |
+------+--------+---------+----------+---------+----------------------+-------------------+
|2     |Peter   |Melbourne|55000     |true     |2023-04-05 21:28:26.36|9999-12-31 00:00:00|
+------+--------+---------+----------+---------+----------------------+-------------------+

### not changed records - left anti - emp_df join emp_delta_df
+------+--------+--------+----------+---------+-----------------------+-------------------+
|emp_id|emp_name|emp_city|emp_salary|is_active|start_time             |end_time           |
+------+--------+--------+----------+---------+-----------------------+-------------------+
|1     |John    |Sydney  |35000     |true     |2023-04-05 21:28:26.586|9999-12-31 00:00:00|
|3     |Sam     |Sydeny  |55000     |true     |2023-04-05 21:28:26.586|9999-12-31 00:00:00|
+------+--------+--------+----------+---------+-----------------------+-------------------+

### new records - left anti - emp_delta_df join emp_df
+------+--------+--------+----------+---------+-----------------------+-------------------+
|emp_id|emp_name|emp_city|emp_salary|is_active|start_time             |end_time           |
+------+--------+--------+----------+---------+-----------------------+-------------------+
|4     |Sam     |Sydeny  |42000     |true     |2023-04-05 21:28:26.781|9999-12-31 00:00:00|
+------+--------+--------+----------+---------+-----------------------+-------------------+

### union all (inactive, active, not_changed, changed)
+------+--------+---------+----------+---------+-----------------------+-----------------------+ |emp_id|emp_name|emp_city |emp_salary|is_active|start_time |end_time | +------+--------+---------+----------+---------+-----------------------+-----------------------+ |1 |John |Sydney |35000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 | |2 |Peter |Melbourne|45000 |false |2023-04-05 21:28:26.956|2023-04-05 21:28:26.956| |2 |Peter |Melbourne|55000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 | |3 |Sam |Sydeny |55000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 | |4 |Sam |Sydeny |42000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 | +------+--------+---------+----------+---------+-----------------------+-----------------------+


No comments:

Post a Comment