Oct 15, 2020

Python lamdba filter


result_dict = {
1: {'site': u'test1.com', 'site_name': u'test1', 'is_https': True},
2: {'site': u'test2.com', 'site_name': u'test2', 'is_https': False}
}
print('-' * 30)
print(result_dict)

print('-' * 30)
https_list = dict(filter(lambda x: x if x[1]['is_https'] else None, result_dict.items()))
non_https_list = dict(filter(lambda x: x if not x[1]['is_https'] else None, result_dict.items()))

print(https_list)
print('-' * 30)
print(non_https_list)

Output:
------------------------------
{1: {'site_name': u'test1', 'site': u'test1.com', 'is_https': True}, 
2: {'site_name': u'test2', 'site': u'test2.com', 'is_https': False}}
------------------------------
{1: {'site_name': u'test1', 'site': u'test1.com', 'is_https': True}}
------------------------------
{2: {'site_name': u'test2', 'site': u'test2.com', 'is_https': False}}

Jun 23, 2020

DB Tools


  • dbeaver (FOSS - Free and Open Source S/W)
  • DB Visualizer (Free Tier - Only Read Operations)
  • PopSQL

Jun 18, 2020

Supervisor in Linux

  • Supervisor is a client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems.
  • Start like any other program at boot time.
  • If process gets killed, it brings up the program on its own.

Tried these in CentOS (RedHat)

How to install
yum -y install supervisor

How to Start/Restart/Stop/Status
systemctl start supervisord
systemctl enable supervisord
systemctl status supervisord

/etc/supervisord.d/test.ini

[group:test]
programs=test1, test2

[program:test1]
command=python -u test1.py
directory=/opt/site
stdout_logfile=/tmp/prabhath.log
redirect_stderr=true

[program:test2]
command=python -u test2.py
directory=/opt/site
stdout_logfile=/tmp/prabhath.log
redirect_stderr=true

How to restart Group
supervisorctl restart test:*


/opt/site/test1.py
import time
while True:
    print('inside Test1')
    time.sleep(1)


/opt/site/test2.py
import time
while True:
    print('inside Test2')
    time.sleep(2)




Jun 16, 2020

How to keep terminal alive


To prevent connection loss, instruct the ssh client to send a sign-of-life signal to the server once in a while. Add the following to ~/.ssh/config

vim ~/.ssh/config

Host *
  ServerAliveInterval 120



May 23, 2020

AWS Elastic Search

AWS Elastic Search
  • Widely adapted open source search engine
  • Building real-time custom search engine
  • Logging and log analysis
  • Scraping and combining multiple data sources
  • Event data and metrics (e.g., application clickstream data)
  • Kibana UI interface tool for ElasticSearch
Cloud Search
  • Fully managed search solution by AWS

AWS Kinesis

AWS Kinesis (3 types)
  • Kinesis Streams
  • Kinesis Firehose
  • Kinesis Analytics
  • Kinesis Streams
    • Ingest and process streaming data with "custom applications"
    • Producers put data into streams
    • Consumers consume data and process them (fleet of instances)
    • Shard
      • It is a basic throughout unit of a stream
      • Streaming data is in the form on Shards
      • Default retention period is 1 day
      • You can keep data till 7 days (168 hrs)
  • Kinesis Firehose
    • Capture, transform & load streaming data
    • Deliver real time data to "AWS destinations" like S3, RedShift, ElasticSearch, Splunk etc.,
    • You don't need to write applications or manage resources
    • Using Lambda, you can transform data before delivering data
    • You can compress/encrypt data there by saving storage cost and increasing security
    • Scales automatically
  • Kinesis Analytics
    • Run SQL queries on Kinesis Streaming data
    • Analyze and process streaming data from Kinesis Streaming/Firehose

Kinesis Limitations
  • No of shards
    • Upto 200
    • Upto 500 in some regions
    • No hard limit (AWS can lift off if u request)
  • Data retention
    • 1 day be default
    • Upto 7 days
    • Hard limit (AWS cannot lift off)
  • Write limitations
    • Shard throughput - upto 1 MB
    • No of transactions - 1000 records/sec
  • Read limitations
    • One shard can return < 2 MB/sec
    • No of transactions: 5 records/shard/sec


Kinesis Vs Kafka Vs SQS



KinesisKafka
Messaging systemMessaging system
Stream/ShardTopic/Partition
ProprietaryOpen Source
No operational overloadHeavy operational overload
Stores upto 7 days (default 1 day)Can store data indefinitely
Kinesis Client LibraryVarious Kafka clients
Log compaction
KinesisSQS
Messaging systemMessaging system
AWSAWS
No operational overloadNo operational overload
State tracking by clientState tracking by SQS
Message can be read many timesProcessed message is deleted
Unified log/stream processingBalancing tasks among workers



May 22, 2020

AWS RedShift

RedShift
  • Data Warehouse
  • Meant to support OLAP (not OLTP), column oriented and massively parallel scale-out architecture
  • OLTP meant to Analytics, aggregation of data
  • Master & slave nodes
  • It does not require/create indexes, materialised views, thereby faster & uses less data than traditional relational databases
  • Supports columnar storage like Parquet, ORC
  • But it has dist_key and sort_key
  • dist_key
    • It is the column on which its distributed on each node
    • Rows with same value of this column are guaranteed to be on the same node
  • sort_key
    • It is the column on which data is sorted on each node
    • Only one sort_key is permitted
  • RedShift doesn't complain on duplicate data even on primary key
    • Advantages
      • Faster, since it need not check if primary key already exists or not
      • Performance, query optimization
    • Disadvantages
      • Chances of improper data (duplicate data)
      • Its upto the user to send proper data to RedShit, user has to handle the improper data before sending it to RedShift Cluster

1) Create a RefShift cluster

2) Connect to cluster and Create tables 
   (Using SQL Workbench - recommended or any DB visualizer)
   If not use Redshitf Query Editor it self

3) Create an IAM role for Redshift with S3 Read only access

4) Attach IAM S3 role to RedShift

5) Suppose your data is in S3, load your data from S3

  copy dimproduct    #<table_name_from_redshift>
  from 's3://redshift-load-queue-test/dimproduct.csv' 
  iam_role '<IAM Role created in step-3>/RedShift-S3-Role' 
  region 'us-east-1'
  format csv
  delimiter ','

6) Upload huge data as gz files
   sales1.txt.gz, sales2.txt.gz, sales3.txt.gz, sale4.txt.gz
   Instead of creating single files, create one manifest file 
   manifest.txt
         {
          "entries": [
              {"url":"s3://redshift-load-queue-test/sales1.txt.gz", "mandatory":true},
              {"url":"s3://redshift-load-queue-test/sales2.txt.gz", "mandatory":true},
              {"url":"s3://redshift-load-queue-test/sales3.txt.gz", "mandatory":true},
              {"url":"s3://redshift-load-queue-test/sales4.txt.gz", "mandatory":true}
          ]   
         }

  copy factsales    
  from 's3://redshift-load-queue-test/manifest.txt
  iam_role '<IAM Role created in step-3>
  region 'us-east-1'
  GZIP
  delimiter '|'
  manifest


7) Copy JSON data
   # Json data should not be in a list
   # It should in individual elements

   copy dimdate
   from 's3://redshift-load-queue-prabhath/dimdate.json'
   region 'us-east-1'
   iam_role '<IAM Role created in step-3>/RedShift-S3-Role'
   json as 'auto'

8) Find out any load errors
   select * from stl_load_errors

9) Integrate Kinesis Streaming FireHose to destination as RedShift
    Given the details, Kinesis form this query for you

      COPY firehose_test_table (ticker_symbol, sector, change, price) 
      FROM 's3://redshift-load-queue-<>/stream2020/05/22/04/test-stream-4-2020-05-22-04-19-21-db425054-695f-4fd1-8721-fc07cdeea369.gz' 
      CREDENTIALS 'aws_iam_role='<IAM Role created in step-3>/RedShift-S3-Role'
      JSON 'auto' gzip;


May 20, 2020

Pandas Tutorial 3

1) Drop Nulls of one column in Pandas
Find isnull in a column:
isnull(master["playerID"]).value_counts()

Output:
False    7520
True      241
Name: playerID, dtype: int64

Drop nulls of specific columns (dropna)
master_orig = master.copy()
master = master.dropna(subset=["playerID"])
master.shape

2) Drop Nulls of multiple column in Pandas
how = 'all' # if all subset cols are nulls
how = 'any' # if any of the subset cols are nulls
df.dropna(subset=[col_list], how='all')
master = master.dropna(subset=["firstNHL", "lastNHL"], how="all")

3)

master1 = master[master["lastNHL"] >= 1980]
master1.shape # (4627, 31)

Vs

master1 = master.loc[master["lastNHL"] >= 1980]
master1.shape # (4627, 31)

But later is good, more performance with huge data


4) filter columns
master.filter(columns_to_keep).head()
(or)
master = master.filter(regex="(playerID|pos|^birth)|(Name$)")


5) Find DF memory usage
df.memory_usage()

def mem_mib(df):
    mem = df.memory_usage().sum() / (1024 * 1024)
    print(f'{mem}.2f Mib')

    
mem_mib(master) # 0.39 MiB
mem_mib(master_orig) # 1.84 MiB

6) Categorical
# A string variable consisting of only a few different values. 
# Converting such a string variable to a categorical variable will save some memory.

def make_categorical(df, col_name):
    df.loc[:, col_name] = pd.Categorical(df[col_name]) 

# to save memory
make_categorical(master, "pos")
make_categorical(master, "birthCountry")
make_categorical(master, "birthState")

7)
pd.read_pickle()

8) Joins
Default is inner join
pd.merge(df1, df2, how='left')

# We joining based on player id of both dfs
# If left df has PlayerId & right df has plrId
pd.merge(df1, df2, left_on='PlayerId', right_on='plrId')

# We joining based on player id of both DFs
# Say if left DF has player id as index
# Here resultant merge DF has index from right DF 
# left DF index (left_index) is not considered in merge dF
pd.merge(df1, df2, left_index=True, right_on='plrId')

# We joining based on player id of both dfs
# Say if right df has player id as index
# Here resultant merge DF has index from left DF 
# right DF index (right_index) is not considered in merge dF
pd.merge(df1, df2, left_on='PlayerId', right_index=True)

# We can even set DF index (set_index) and use
# left_index and right_index 
pd.merge(df1, df2.set_index("playerID", drop=True),
                            left_index=True, right_index=True).head()

# Indicator
# It creates additional column _merge
# It indicates both, left_only, right_only
merged = pd.merge(master2, scoring, left_index=True,
                  right_on="playerID", how="right", indicator=True)

merged["_merge"].value_counts()
both          28579
right_only       37
left_only         0
Name: _merge, dtype: int64

# Filter only right_only
merged[merged["_merge"] == "right_only"].head()

# Filter only right_only or left_only
merged[(merged["_merge"] == "right_only") | (merged["_merge"] == "left_only")].sample(3)
or
merged[merged["_merge"].str.endswith("only")].sample(5)

# Filter out 1:m (one to many)
try:
pd.merge(df1, df2, left_index=True, right_on='plrId', validate="1:m").head()
except Exception as e:
pass


8) Drop random records
df.drop(drop.sample(5).index)
-------
9) Longer to Wider format (pivot)
df.show()

playerID year Goals
10320 hlavaja01 2001 7.0
10322 hlavaja01 2002 1.0
10324 hlavaja01 2003 5.0
15873 markoan01 2001 5.0
15874 markoan01 2002 13.0
15875 markoan01 2003 6.0
18899 nylanmi01 2001 15.0
18900 nylanmi01 2002 0.0
18902 nylanmi01 2003 0.0

# Longer to Wider format conversion
pivot = df.pivot(index="playerID", columns="year", values="Goals")
year 2001 2002 2003
playerID
hlavaja01 7.0 1.0 5.0
markoan01 5.0 13.0 6.0
nylanmi01 15.0 0.0 0.0

pivot = pivot.reset_index()
pivot.columns.name = None
pivot

playerID 2001 2002 2003
0 hlavaja01 7.0 1.0 5.0
1 markoan01 5.0 13.0 6.0
2 nylanmi01 15.0 0.0 0.0

10) Wide to Long format (melt)
# melt()
# Pandas melt() function is used to change the DataFrame format from wide to long.
pivot.melt(id_vars="playerID", var_name="year", value_name="goals")
playerID year goals
0 hlavaja01 2001 7.0
1 markoan01 2001 5.0
2 nylanmi01 2001 15.0
3 hlavaja01 2002 1.0
4 markoan01 2002 13.0
5 nylanmi01 2002 0.0
6 hlavaja01 2003 5.0
7 markoan01 2003 6.0
8 nylanmi01 2003 0.0

-------
Pandas Multi-level Index

1) How to set multi-index
mi = df.set_index(['playerID', 'year'])
mi.head()

2) List multi-index values 
mi.index
MultiIndex([('aaltoan01', 1997),
            ('aaltoan01', 1998),
            ('zyuzian01', 2005),
            ('zyuzian01', 2006),
            ('zyuzian01', 2007)],
           names=['playerID', 'year'], length=28616)

3) len(mi.index.levels) # 2

4) mi.index.levels[0]
Index(['aaltoan01', 'abdelju01', 'abidra01', 'abrahth01', 'actonke01',
       'adamlu01', 'adamru01'], dtype='object', name='playerID', length=4627)

5) mi.index.levels[1]
Int64Index([1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990],
           dtype='int64', name='year')

6) mi.groupby(level="year")['G'].max().head()
year
1980    68.0
1981    92.0
1982    71.0
1983    87.0
1984    73.0
Name: G, dtype: float64

7) idmax (gives index)
mi.groupby(level="year")['G'].idmax().head()
year
1980    (bossymi01, 1980)
1981    (gretzwa01, 1981)
1982    (gretzwa01, 1982)
1983    (gretzwa01, 1983)
1984    (gretzwa01, 1984)
Name: G, dtype: object

8) Filter based on above
mi.loc[mi.groupby(level="year")['G'].idxmax()].head()

firstName lastName pos Year Mon Day Country State City tmID GP G A Pts SOG
playerID year
bossymi01 1980 Mike Bossy R 1957.0 1.0 22.0 Canada QC Montreal NYI 79.0 68.0 51.0 119.0 315.0
gretzwa01 1981 Wayne Gretzky C 1961.0 1.0 26.0 Canada ON Brantford EDM 80.0 92.0 120.0 212.0 369.0
1982 Wayne Gretzky C 1961.0 1.0 26.0 Canada ON Brantford EDM 80.0 71.0 125.0 196.0 348.0
1983 Wayne Gretzky C 1961.0 1.0 26.0 Canada ON Brantford EDM 74.0 87.0 118.0 205.0 324.0
1984 Wayne Gretzky C 1961.0 1.0 26.0 Canada ON Brantford EDM 80.0 73.0 135.0 208.0 358.0

May 19, 2020

Python 2 Vs 3


Python 2Python 3
input() may store as int, string
raw_input() stores str always
input() function was fixed in Python 3 so that it always stores the user inputs as str
print "Hi"
print("Hi")
print("Hi")
3/2 ==> floor(1.5) => 1 (defaults to floor), return int3/2 ==> 1.5
Strings default stores as AsciiStrings default stores as unicode

Unicode is a superset of ASCII and hence, can encode more characters including foreign ones.
sorted(employees.items(), key=lambda(x,y): y['age'])sorted(employees.items(), key=lambda x: x[1]['age'])
AsyncIO
Fstrings
It is recommended to use __future__ imports it if you are planning Python 3.x support for your code
xrange() - Lazy evaluationrange() - Lazy evaluation
except NameError, err:except NameError as err:
my_generator = (letter for letter in 'abcdefg')

next(my_generator)
my_generator.next()
my_generator = (letter for letter in 'abcdefg')

next(my_generator)
print 'Python', python_version()

i = 1
print 'before: i =', i
print 'comprehension: ', [i for i in range(5)]
print 'after: i =', i

Python 2.7.6
before: i = 1
comprehension: [0, 1, 2, 3, 4]
after: i = 4
Python 3.x for-loop variables don’t leak into the global namespace anymore!

print ('Python', python_version())
i = 1
print 'before: i =', i
print 'comprehension: ', [i for i in range(5)]
print 'after: i =', i

Python 3.4.1
before: i = 1
comprehension: [0, 1, 2, 3, 4]
after: i = 1
print range(3)
print type(range(3))

[0, 1, 2]
<type 'list'>
print range(3)
print type(range(3))
print(list(range(3)))

range(0, 3)
<class 'range'>
[0, 1, 2]
round(15.5) # 16.0
round(16.5) # 17.0
Bankers rounding
round(15.5) # 16
round(16.5) # 16

Python List Vs Array

# Arrays Vs Lists
  • Arrays need to be declared. Lists don’t
  • Arrays can store data very compactly
  • Arrays are great for numerical operations

import array

# Array (stores single data type)
array.array('i', [1, 22, 30, 44, 51]) # integer
array.array('d', [2.5, 3.2, 3.3]) # float
array.array('u', ['a', 'b', 'c']) # unicode

#List
ll = ['abc', 10, ['a', 'b', 'c'], (1,2,3)] # List can store anything 


import numpy as np

# Numpy Array (it can store various data types)
array_2 = np.array(["numbers", 3, 6, 9, 12])
print (array_2)
print(type(array_2))





 

Python random

import random

random.random() ---> 0 to 1 float
random.randint() * 100 ---> 0 to 100 integer
random.randint() * 100 - 50       ---> -50 to 50 integer
random.randint(1, 40) ---> b/w 1 to 40 integer
random.uniform(1,50) ---> b/w 1 to 50 float


Python itertools

# Itertools
# Iterate over data structures that can be stepped over using a for-loop.
# Such data structures are also known as iterables.
# Itertools are more readable, fast, memory-efficient
# provides various functions that work on iterators to produce complex iterators.
# Infinite iterators: count, cycle, repeat
# Finite iterators: chain, compress, dropwhile

import itertools

# 1) Count - Infinite iterator
# print the first four even numbers
result = itertools.count(start = 0, step = 2)
for number in result:
if number < 8:
print (number)
else:
break

# Output:
# 0
# 2
# 4
# 6

# 2) Cycle - Infinite iterator
result = itertools.cycle('test')
counter = 0
for each in result:
if counter > 10:
break
counter += 1
print(each)

# Output:
# t
# e
# s
# t
# t
# e
# s
# t
# t
# e
# s

# 3) Repeat - Infinite iterator
result = itertools.repeat('test', 2)
for each in result:
print(each)

# Output:
# test
# test

# 1) Chain - Finite iterator
l1 = ['aaa', 'bbb', 'ccc']
l2 = ['ddd', 'eee', 'fff']
result = itertools.chain(l1, l2)
for each in result:
print(each)

# Output:
# aaa
# bbb
# ccc
# ddd
# eee
# fff

# 2) Compress - Finite iterator
l1 = ['aaa', 'bbb', 'ccc']
l2 = [True, False, False]
result = itertools.compress(l1, l2)
for each in result:
print(each)

# Output:
# aaa

# 3) Dropwhile - Finite iterator
# keeps on dropping values from the iterable until it encounters the first element
def is_positive(n):
return n > 0
value_list =[5, 6, -8, -4, 2]
result = list(itertools.dropwhile(is_positive, value_list))
print(result)

# Output:
# [-8, -4, 2]

# 4) groupby
ll = [("aaa", 1), ("aaa", 2), ("bbb", 3), ("bbb", 4)]
# Key function
key_func = lambda y: y[0]

for key, group in itertools.groupby(ll, key_func):
print(key + " :", list(group))

# Output:
# aaa : [('aaa', 1), ('aaa', 2)]
# bbb : [('bbb', 3), ('bbb', 4)]


Pandas Tutorial 2 (data formats, db, files, pickle)

import numpy as np
import pandas as pd

# nrows - just read first N rows
# usecols - read only those columns
df = pd.read_csv("input.csv", index_col="Id")
print(df.head(10))
# Name Age City
# Id
# 1 John 30 Bangalore
# 2 Doe 25 Chennai
# 3 Mary 22 Hyderabad
# 4 Tom 35 Mumbai

df = pd.read_csv("input.csv", index_col="Id", nrows=2, usecols=['Id', 'Name'])
print(df.head(10))
# Name
# Id
# 1 John
# 2 Doe

# Serialize & save to disk
df = pd.read_csv("input.csv", index_col="Id")
df.to_pickle('data_frame.pickle')

df1 = pd.read_pickle('data_frame.pickle')
print(df1.head(10))
# Name Age City
# Id
# 1 John 30 Bangalore
# 2 Doe 25 Chennai
# 3 Mary 22 Hyderabad
# 4 Tom 35 Mumbai

# Dataframe To Table
table_dtype = {
'name': VARCHAR(),
'city': VARCHAR(),
'state': VARCHAR(),
'pincode': INTEGER()
}
df1.to_sql(con=db_conn, name=table_name, dtype=table_dtype,
if_exists='replace', index=False)


# From Table to Dataframe
raw_data_sql = f""" select id, name, city, state
from event_raw_data
"""
df1 = pd.read_sql(raw_data_sql, con=db_conn, index_col="id")





Pandas Tutorial 1

import numpy as np
import pandas as pd

la = np.random.rand(3)
dd = pd.Series(la)
print(dd.head(10))
# 0 0.145517
# 1 0.718904
# 2 0.448772
# dtype: float64

ll = range(3)
dd = pd.Series(ll)
print(dd.head(10))
# 0 0
# 1 1
# 2 2
# dtype: int64

dd = pd.Series(ll, index=['First', 'Second', 'Third'])
print(dd.head(10))
# First 0
# Second 1
# Third 2
# dtype: int64

print(dd["First"]) # 0
print(dd[0]) # 0

nd = np.random.rand(3,2)
df = pd.DataFrame(nd)
df.columns = ["First", "Second"]
print(df.head(10))
# First Second
# 0 0.567850 0.275874
# 1 0.839382 0.792727
# 2 0.445246 0.537417

print(df.loc[0])
# First 0.567850
# Second 0.275874
# Name: 0, dtype: float64

print(df.iloc[0,1]) #0.275874
print(df.loc[0, "Second"]) #0.275874





May 18, 2020

Spark 1 vs Spark 2

 
Spark 1.xSpark 2.x
Spark Context is the entry pointSpark Session is the entry point
We need to create separately sql context, hive context if we have only SparkContext.Spark Session is enough
Spark 1.x uses compilers which uses of several function calls and CPU cycles, because of which so much unnecessary work spent on CPU cycles.Spark 2.x uses performance enhanced Tungsten engine
1X10X times faster than Spark 1.X
Spark Streaming (uses RDD batch concept)Structured Streaming (uses DataFrames/DataSet APIs)
Unified Dataset and DataFrame APIs (Dataset has more type safety, not available in Python). Now Dataframe is just an alias for Dataset of Row
Many machine learning algorithms like Gaussian Mixture Model, MaxAbsScaler, Bisecting K-Means clustering feature transformer are added to DataFrame based API and many ML algorithms added to PySpark and SparkR also.
RDD based API is going into maintenance modeDataFrame based has become the primary API now

 

Spark Intro

  • Spark
    • Apache is unified analytics engine and large-scale data processing
    • Latest version 2.4.5 Feb 2020
    • Speed
      • Apache Spark achieves high performance for both batch and streaming using state of the art DAG scheduler, query optimizer and physical execution engine
      • Runs 100X times faster than Hadoop
    • Ease of use
      • Write applications quickly in Java, Scala, Python, R and SQL
    • Generality
      • Spark SQL
      • Spark Streaming
      • MLib
      • GraphX
    • Runs every where
      • Spark runs on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud. It can access diverse data sources.
      • You can run Spark using its standalone cluster mode, on EC2, on Hadoop YARN, on Mesos, or on Kubernetes. 
      • Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.

PySpark Streaming Vs Structured Streaming

Spark StreamingStructured Streaming
Spark 1.XIntroduced in Spark 2.X
Separate library in Spark to process continuously flowing Streaming dataBuilt on Spark SQL library
Uses DStreams API powered by Spark RDDs. It works on micro batches (each batch represent RDDs)This model is based on Dataframe and Dataset APIs. No batch concept here.
DStrams provide us data divided into chunks as RDDs received from source of Streaming to be processed and outputs batches of processed dataHere we keep adding stream data to DataFrame (Unbounded table)
Not easy to applyWe can easily apply SQL query or scala operations on streaming data
Result of Unbounded table/dataframe is based on mode of your operations Complete, Append, Update
RDDDataframe/Dataset are more optimized & less time consuming, easy to understand. Apply aggregations
No such option called event-time, only works with timestamp when the data is received. Based on the ingestion timestamp, Spark Streaming puts the data in a batch even if the event is generated early and belonged to the earlier batch, which may result in less accurate information as it is equal to the data loss(Windowing) With event-time handling of late data, Structured Streaming outweighs Spark Streaming.