from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local')\
.appName('scdType2')\
.getOrCreate()
emp_df = spark.read.csv('C:\\Users\\Prabhath\\Downloads\\employee.csv', inferSchema=True, header=True,
ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
emp_df = emp_df.alias('emp_df')
emp_df.show()
emp_delta_df = spark.read.csv('C:\\Users\\Prabhath\\Downloads\\employee_delta.csv', inferSchema=True, header=True,
ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
emp_delta_df = emp_delta_df.alias('emp_delta_df')
emp_delta_df.show()
#in-active
emp_inactive = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "inner")\
.select("emp_df.*")\
.withColumn('is_active', lit(False))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', current_timestamp())
print('### inactive records - common in both - emp_df depreated, so take only those')
emp_inactive.show(truncate=False)
#active
emp_active = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "inner")\
.select("emp_delta_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### active records - common in both - emp_delta_df latest, so take only those')
emp_active.show(truncate=False)
#only left (no
emp_no_change_df = emp_df.join(emp_delta_df, emp_df.emp_id==emp_delta_df.emp_id, "left_anti")\
.select("emp_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### not changed records - left anti - emp_df join emp_delta_df')
emp_no_change_df.show(truncate=False)
#only right (new)
emp_new_df = emp_delta_df.join(emp_df, emp_df.emp_id==emp_delta_df.emp_id, "left_anti")\
.select("emp_delta_df.*")\
.withColumn('is_active', lit(True))\
.withColumn('start_time', current_timestamp())\
.withColumn('end_time', to_timestamp(lit('9999-12-31 00:00:00.000'), 'yyyy-MM-dd HH:mm:ss.SSSS'))
print('### new records - left anti - emp_delta_df join emp_df')
emp_new_df.show(truncate=False)
#union all
final_df = emp_inactive.union(emp_active).union(emp_no_change_df).union(emp_new_df).orderBy('emp_id', 'is_active')
final_df.show(truncate=False)
+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
| 1| John| Sydney| 35000|
| 2| Peter|Melbourne| 45000|
| 3| Sam| Sydeny| 55000|
+------+--------+---------+----------+
+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
| 2| Peter|Melbourne| 55000|
| 4| Sam| Sydeny| 42000|
+------+--------+---------+----------+
### inactive records - common in both - emp_df depreated, so take only those
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|emp_id|emp_name|emp_city |emp_salary|is_active|start_time |end_time |
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|2 |Peter |Melbourne|45000 |false |2023-04-05 21:28:26.006|2023-04-05 21:28:26.006|
+------+--------+---------+----------+---------+-----------------------+-----------------------+
### active records - common in both - emp_delta_df latest, so take only those
+------+--------+---------+----------+---------+----------------------+-------------------+
|emp_id|emp_name|emp_city |emp_salary|is_active|start_time |end_time |
+------+--------+---------+----------+---------+----------------------+-------------------+
|2 |Peter |Melbourne|55000 |true |2023-04-05 21:28:26.36|9999-12-31 00:00:00|
+------+--------+---------+----------+---------+----------------------+-------------------+
### not changed records - left anti - emp_df join emp_delta_df
+------+--------+--------+----------+---------+-----------------------+-------------------+
|emp_id|emp_name|emp_city|emp_salary|is_active|start_time |end_time |
+------+--------+--------+----------+---------+-----------------------+-------------------+
|1 |John |Sydney |35000 |true |2023-04-05 21:28:26.586|9999-12-31 00:00:00|
|3 |Sam |Sydeny |55000 |true |2023-04-05 21:28:26.586|9999-12-31 00:00:00|
+------+--------+--------+----------+---------+-----------------------+-------------------+
### new records - left anti - emp_delta_df join emp_df
+------+--------+--------+----------+---------+-----------------------+-------------------+
|emp_id|emp_name|emp_city|emp_salary|is_active|start_time |end_time |
+------+--------+--------+----------+---------+-----------------------+-------------------+
|4 |Sam |Sydeny |42000 |true |2023-04-05 21:28:26.781|9999-12-31 00:00:00|
+------+--------+--------+----------+---------+-----------------------+-------------------+
### union all (inactive, active, not_changed, changed)
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|emp_id|emp_name|emp_city |emp_salary|is_active|start_time |end_time |
+------+--------+---------+----------+---------+-----------------------+-----------------------+
|1 |John |Sydney |35000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 |
|2 |Peter |Melbourne|45000 |false |2023-04-05 21:28:26.956|2023-04-05 21:28:26.956|
|2 |Peter |Melbourne|55000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 |
|3 |Sam |Sydeny |55000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 |
|4 |Sam |Sydeny |42000 |true |2023-04-05 21:28:26.956|9999-12-31 00:00:00 |
+------+--------+---------+----------+---------+-----------------------+-----------------------+