#Pyspark - How to Find the consecutive values in PySpark DataFrame column and replace the value
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local')\
.appName('scdType2')\
.getOrCreate()
data1 = [
(1, 'ABC'),
(2, 'ABC'),
(3, 'ABC'),
(4, 'XYZ'),
(5, 'PQR'),
(6, 'PQR')
]
columns1 = ['id', 'dept']
dataDF = spark.createDataFrame(data=data1, schema=columns1)
dataDF.show()
winSpec = Window.partitionBy('dept').orderBy('id')
dataDF.withColumn('row_num', row_number().over(winSpec))\
.withColumn('result', col("dept"))\
.show(truncate=False)
# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR |
# |6 |PQR |2 |PQR |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC |
# |2 |ABC |2 |ABC |
# |3 |ABC |3 |ABC |
# +---+----+-------+------+
dataDF.withColumn('row_num', row_number().over(winSpec))\
.withColumn('result', when((col("row_num") > 1), concat_ws('_', "dept", "row_num")).otherwise(col("dept")))\
.show(truncate=False)
# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR |
# |6 |PQR |2 |PQR_2 |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC |
# |2 |ABC |2 |ABC_2 |
# |3 |ABC |3 |ABC_3 |
# +---+----+-------+------+
#But we supposed to get output as below
# We only have to add _1/_2 when duplicate is there
# use lag/lead to find out duplicate present
# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR_1 |
# |6 |PQR |2 |PQR_2 |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC_1 |
# |2 |ABC |2 |ABC_2 |
# |3 |ABC |3 |ABC_3 |
# +---+----+-------+------+
wind_id = Window.orderBy('id')
wind_by_dept = Window.partitionBy("dept").orderBy("dept")
condition = col("chk_duplicate")| ((col("chk_length")>1) & (col("row_num")==1))
new_df = dataDF.withColumn("chk_duplicate",col("dept")==lag("dept").over(wind_id))\
.withColumn("row_num",row_number().over(wind_by_dept))\
.withColumn("chk_length",count("dept").over(wind_by_dept))\
.withColumn("dept_updated",when(condition,concat_ws("_", *["dept","row_num"]))
.otherwise(col("dept")))
new_df.orderBy("id").show()
# +---+----+-------------+-------+----------+------------+
# | id|dept|chk_duplicate|row_num|chk_length|dept_updated|
# +---+----+-------------+-------+----------+------------+
# | 1| ABC| null| 1| 3| ABC_1|
# | 2| ABC| true| 2| 3| ABC_2|
# | 3| ABC| true| 3| 3| ABC_3|
# | 4| XYZ| false| 1| 1| XYZ|
# | 5| PQR| false| 1| 2| PQR_1|
# | 6| PQR| true| 2| 2| PQR_2|
# +---+----+-------------+-------+----------+------------+
#*dataDF.columns, dept_updated - having issue - keep wild card later
#only named arguments may follow *expression
#new_df.select(*dataDF.columns, 'dept_updated').orderBy("id").show()
new_df.select('dept_updated', *dataDF.columns).orderBy("id").show()
# +------------+---+----+
# |dept_updated| id|dept|
# +------------+---+----+
# | ABC_1| 1| ABC|
# | ABC_2| 2| ABC|
# | ABC_3| 3| ABC|
# | XYZ| 4| XYZ|
# | PQR_1| 5| PQR|
# | PQR_2| 6| PQR|
# +------------+---+----+
#or
new_df.select('id', 'dept', 'dept_updated').orderBy("id").show()
# +---+----+------------+
# | id|dept|dept_updated|
# +---+----+------------+
# | 1| ABC| ABC_1|
# | 2| ABC| ABC_2|
# | 3| ABC| ABC_3|
# | 4| XYZ| XYZ|
# | 5| PQR| PQR_1|
# | 6| PQR| PQR_2|
# +---+----+------------+
No comments:
Post a Comment