#............................................................................ ## Loading and Storing Data #............................................................................
#Create a Spark Session
SpSession = SparkSession \
.builder \
.master("local[2]") \ #[2] is no of partitions
.appName("learnSpark") \
.config("spark.executor.memory", "1g") \
.config("spark.cores.max","2") \
.getOrCreate()
#Get the Spark Context from Spark Session
SpContext = SpSession.sparkContext
#Get the Spark Context from Spark Session
SpContext = SpSession.sparkContext
print SpContext
#Create an RDD by loading from a file
tweetsRDD = SpContext.textFile("movietweets.csv")
print tweetsRDD
#Action - Count the number of tweets
print tweetsRDD.count()
#show top 5 records
print tweetsRDD.take(5)
#Transform Data - change to upper Case
ucRDD = tweetsRDD.map( lambda x : x.upper() )
print ucRDD.take(5)
#Load from a collection collData = SpContext.parallelize([4,3,8,5,8]) collData.collect() # bring the entire RDD to the driver node, could be expensive
#Load the file. Lazy initialization
autoData = SpContext.textFile("auto-data.csv")
autoData.cache()
#Loads only now.
print autoData.count() #198
print autoData.first() #prints 1st line
print autoData.take(5) #gives you a list of 5 elements
#Save to a local file. First collect the RDD to the master
#and then save as local file.
autoDataFile = open("auto-data-saved.csv","w")
autoDataFile.write("\n".join(autoData.collect()))
autoDataFile.close()
#............................................................................
## Transformations
#............................................................................
#Map and create a new RDD
tsvData=autoData.map(lambda x : x.replace(",","\t"))
print tsvData.take(5)
#Filter and create a new RDD
toyotaData=autoData.filter(lambda x: "toyota" in x)
print toyotaData #RDD object it prints
print toyotaData.count() #32
#FlatMap
words=toyotaData.flatMap(lambda line: line.split(","))
print words.count() #384 words in 32 lines of toyota
print words.take(20) #it takes 20 words from 384 and print
print '----'
#Distinct
for numbData in collData.distinct().collect():
print(numbData)
print '----'
#Set operations
words1 = SpContext.parallelize(["hello","war","peace","world"])
words2 = SpContext.parallelize(["war","peace","universe"])
for unions in words1.union(words2).distinct().collect():
print(unions)
print '----'
for intersects in words1.intersection(words2).collect():
print(intersects)
print '----'
#Using functions for transformation
#cleanse and transform an RDD
def cleanseRDD(autoStr) :
if isinstance(autoStr, int) :
return autoStr
attList=autoStr.split(",")
#convert doors to a number str
if attList[3] == "two" :
attList[3]="2"
else :
attList[3]="4"
#Convert Drive to uppercase
attList[5] = attList[5].upper()
return ",".join(attList)
cleanedData=autoData.map(cleanseRDD)
print cleanedData.collect() #it returns a List
#............................................................................
## Actions
#............................................................................
#reduce - compute the sum
collData.collect() #[4,3,8,5,8]
print collData.reduce(lambda x,y: x+y) #28
print '----'
#find the shortest line - reduce() RDD function
print autoData.reduce(lambda x,y: x if len(x) < len(y) else y)
print '----'
#Use a function to perform reduce
def getMPG( autoStr) :
if isinstance(autoStr, int) :
return autoStr
attList=autoStr.split(",")
if attList[9].isdigit() :
return int(attList[9])
else:
return 0
#find average MPG-City for all cars
print autoData.reduce(lambda x,y : getMPG(x) + getMPG(y)) \
/ (autoData.count()-1.0) # account for header line
#............................................................................
## Working with Key/Value RDDs
#............................................................................
#create a Key Value RDD of auto Brand and Horsepower
cylData = autoData.map( lambda x: ( x.split(",")[0], \
x.split(",")[7]))
print cylData.count() #198
print '----'
print cylData.take(5) #take first 5 key-value pairs
print '----'
print cylData.keys().collect() #get all keys from key-value pair RDD cylData
print '----'
#Remove header row
header = cylData.first()
print header
cylHPData= cylData.filter(lambda line: line != header) #get data without header
print cylHPData.count() #197
print '----'
#Find average HP by Brand
#Add a count 1 to each record and then reduce to find totals of HP and counts
addOne = cylHPData.mapValues(lambda x: (x, 1))
print addOne.collect() #(u'bmw', (u'182', 1)), (u'mercedes-benz', (u'123', 1))......, (u'bmw', (u'182', 1)), (u'mercedes-benz', (u'184', 1)) ]
print '----'
brandValues= addOne \
.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1]))
print brandValues.collect() #(u'mercedes-benz', (1170, 8)), (u'mitsubishi', (1353, 13)), (u'saab', (760, 6)), (u'volkswagen', (973, 12))
print '----'
#find average by dividing HP total by count total
print brandValues.mapValues(lambda x: int(x[0])/int(x[1])). \
collect()
#[(u'dodge', 84), (u'mercury', 175), (u'jaguar', 204), (u'alfa-romero', 125), (u'nissan', 102), (u'toyota', 92), (u'plymouth', 86), (u'mazda', 86), (u'subaru', 86), (u'peugot', 99), (u'porsche', 191), (u'isuzu', 84), (u'chevrolet', 62), (u'honda', 80), (u'volvo', 128), (u'bmw', 138), (u'mercedes-benz', 146), (u'mitsubishi', 104), (u'saab', 126), (u'volkswagen', 81), (u'audi', 114)]
print '----'
#............................................................................
## Advanced Spark : Accumulators & Broadcast Variables
#............................................................................
#function that splits the line as well as counts sedans and hatchbacks
#Speed optimization
#Initialize accumulator
sedanCount = SpContext.accumulator(0)
hatchbackCount = SpContext.accumulator(0)
#Set Broadcast variable
sedanText=SpContext.broadcast("sedan")
hatchbackText=SpContext.broadcast("hatchback")
def splitLines(line) :
global sedanCount
global hatchbackCount
#Use broadcast variable to do comparison and set accumulator
if sedanText.value in line:
sedanCount += 1
if hatchbackText.value in line:
hatchbackCount += 1
return line.split(",")
#do the map
splitData=autoData.map(splitLines)
#Make it execute the map (lazy execution)
print splitData.count()
print '----'
print(sedanCount, hatchbackCount)
print '----'
#............................................................................
## Advanced Spark : Partitions
#............................................................................
print collData.getNumPartitions()
#Specify no. of partitions.
collData = SpContext.parallelize([3,5,4,7,4], 4)
print collData.cache()
print collData.count()
print '----'
print collData.getNumPartitions()
print '----'
#localhost:4040 shows the current spark instance
RDD
- Transformation - map
- Action - reduce, take
- lazy loads
Persist()
- percentofDelayedFlights = flightsParsed.filter(lambda x:x.delay>0).count() / flaot(flightsParsed.count())
- Here we are using flightsParsed twice, instead we can force RDD to be materialized once using persist()
- flightsParsed.persist() # Cached, huge performance saving
- Any RDD you use over and over again, you need to persist() to improve performance
- flightsParsed.unpersist() # UnCache, free-up memory
aggregate()
- totalDistance = flightsParsed.filter(lambda x:x.distance).reduce(lambda x,y:x+y)
- avgDistance = totalDistance / flightsParsed.count()
- Instead of two actions reduce & count, we can go with one using aggregate
Freq Dist by Hours
- flightsParsed.filter(lambda x: int(x.dep_delay/60)).countByValue()