Apr 18, 2023

Pyspark - How to Find the consecutive values in PySpark DataFrame column and replace the value

#Pyspark - How to Find the consecutive values in PySpark DataFrame column and replace the value

from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master(
'local')\
.appName(
'scdType2')\
.getOrCreate()
data1 = [
(
1, 'ABC'),
(
2, 'ABC'),
(
3, 'ABC'),
(
4, 'XYZ'),
(
5, 'PQR'),
(
6, 'PQR')
]
columns1 = [
'id', 'dept']

dataDF = spark.createDataFrame(
data=data1, schema=columns1)
dataDF.show()
winSpec = Window.partitionBy(
'dept').orderBy('id')

dataDF.withColumn(
'row_num', row_number().over(winSpec))\
.withColumn(
'result', col("dept"))\
.show(
truncate=False)

# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR |
# |6 |PQR |2 |PQR |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC |
# |2 |ABC |2 |ABC |
# |3 |ABC |3 |ABC |
# +---+----+-------+------+


dataDF.withColumn('row_num', row_number().over(winSpec))\
.withColumn(
'result', when((col("row_num") > 1), concat_ws('_', "dept", "row_num")).otherwise(col("dept")))\
.show(
truncate=False)

# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR |
# |6 |PQR |2 |PQR_2 |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC |
# |2 |ABC |2 |ABC_2 |
# |3 |ABC |3 |ABC_3 |
# +---+----+-------+------+

#But we supposed to get output as below
# We only have to add _1/_2 when duplicate is there
# use lag/lead to find out duplicate present
# +---+----+-------+------+
# |id |dept|row_num|result|
# +---+----+-------+------+
# |5 |PQR |1 |PQR_1 |
# |6 |PQR |2 |PQR_2 |
# |4 |XYZ |1 |XYZ |
# |1 |ABC |1 |ABC_1 |
# |2 |ABC |2 |ABC_2 |
# |3 |ABC |3 |ABC_3 |
# +---+----+-------+------+

wind_id = Window.orderBy('id')
wind_by_dept = Window.partitionBy(
"dept").orderBy("dept")
condition = col(
"chk_duplicate")| ((col("chk_length")>1) & (col("row_num")==1))

new_df = dataDF.withColumn(
"chk_duplicate",col("dept")==lag("dept").over(wind_id))\
.withColumn(
"row_num",row_number().over(wind_by_dept))\
.withColumn(
"chk_length",count("dept").over(wind_by_dept))\
.withColumn(
"dept_updated",when(condition,concat_ws("_", *["dept","row_num"]))
.otherwise(col(
"dept")))

new_df.orderBy(
"id").show()
# +---+----+-------------+-------+----------+------------+
# | id|dept|chk_duplicate|row_num|chk_length|dept_updated|
# +---+----+-------------+-------+----------+------------+
# | 1| ABC| null| 1| 3| ABC_1|
# | 2| ABC| true| 2| 3| ABC_2|
# | 3| ABC| true| 3| 3| ABC_3|
# | 4| XYZ| false| 1| 1| XYZ|
# | 5| PQR| false| 1| 2| PQR_1|
# | 6| PQR| true| 2| 2| PQR_2|
# +---+----+-------------+-------+----------+------------+

#*dataDF.columns, dept_updated - having issue - keep wild card later
#only named arguments may follow *expression
#new_df.select(*dataDF.columns, 'dept_updated').orderBy("id").show()
new_df.select('dept_updated', *dataDF.columns).orderBy("id").show()

# +------------+---+----+
# |dept_updated| id|dept|
# +------------+---+----+
# | ABC_1| 1| ABC|
# | ABC_2| 2| ABC|
# | ABC_3| 3| ABC|
# | XYZ| 4| XYZ|
# | PQR_1| 5| PQR|
# | PQR_2| 6| PQR|
# +------------+---+----+

#or
new_df.select('id', 'dept', 'dept_updated').orderBy("id").show()
# +---+----+------------+
# | id|dept|dept_updated|
# +---+----+------------+
# | 1| ABC| ABC_1|
# | 2| ABC| ABC_2|
# | 3| ABC| ABC_3|
# | 4| XYZ| XYZ|
# | 5| PQR| PQR_1|
# | 6| PQR| PQR_2|
# +---+----+------------+



No comments:

Post a Comment