#............................................................................ ## Loading and Storing Data #............................................................................
#Create a Spark Session SpSession = SparkSession \ .builder \ .master("local[2]") \ #[2] is no of partitions .appName("learnSpark") \ .config("spark.executor.memory", "1g") \ .config("spark.cores.max","2") \ .getOrCreate()
#Get the Spark Context from Spark Session
SpContext = SpSession.sparkContext
#Get the Spark Context from Spark Session SpContext = SpSession.sparkContext print SpContext #Create an RDD by loading from a file tweetsRDD = SpContext.textFile("movietweets.csv") print tweetsRDD #Action - Count the number of tweets print tweetsRDD.count() #show top 5 records print tweetsRDD.take(5) #Transform Data - change to upper Case ucRDD = tweetsRDD.map( lambda x : x.upper() ) print ucRDD.take(5)
#Load from a collection collData = SpContext.parallelize([4,3,8,5,8]) collData.collect() # bring the entire RDD to the driver node, could be expensive
#Load the file. Lazy initialization autoData = SpContext.textFile("auto-data.csv") autoData.cache() #Loads only now. print autoData.count() #198 print autoData.first() #prints 1st line print autoData.take(5) #gives you a list of 5 elements
#Save to a local file. First collect the RDD to the master #and then save as local file. autoDataFile = open("auto-data-saved.csv","w") autoDataFile.write("\n".join(autoData.collect())) autoDataFile.close()
#............................................................................ ## Transformations #............................................................................ #Map and create a new RDD tsvData=autoData.map(lambda x : x.replace(",","\t")) print tsvData.take(5) #Filter and create a new RDD toyotaData=autoData.filter(lambda x: "toyota" in x) print toyotaData #RDD object it prints print toyotaData.count() #32 #FlatMap words=toyotaData.flatMap(lambda line: line.split(",")) print words.count() #384 words in 32 lines of toyota print words.take(20) #it takes 20 words from 384 and print print '----' #Distinct for numbData in collData.distinct().collect(): print(numbData) print '----' #Set operations words1 = SpContext.parallelize(["hello","war","peace","world"]) words2 = SpContext.parallelize(["war","peace","universe"]) for unions in words1.union(words2).distinct().collect(): print(unions) print '----' for intersects in words1.intersection(words2).collect(): print(intersects)
print '----' #Using functions for transformation #cleanse and transform an RDD def cleanseRDD(autoStr) : if isinstance(autoStr, int) : return autoStr attList=autoStr.split(",") #convert doors to a number str if attList[3] == "two" : attList[3]="2" else : attList[3]="4" #Convert Drive to uppercase attList[5] = attList[5].upper() return ",".join(attList) cleanedData=autoData.map(cleanseRDD) print cleanedData.collect() #it returns a List #............................................................................ ## Actions #............................................................................ #reduce - compute the sum collData.collect() #[4,3,8,5,8] print collData.reduce(lambda x,y: x+y) #28 print '----' #find the shortest line - reduce() RDD function print autoData.reduce(lambda x,y: x if len(x) < len(y) else y) print '----' #Use a function to perform reduce def getMPG( autoStr) : if isinstance(autoStr, int) : return autoStr attList=autoStr.split(",") if attList[9].isdigit() : return int(attList[9]) else: return 0 #find average MPG-City for all cars print autoData.reduce(lambda x,y : getMPG(x) + getMPG(y)) \ / (autoData.count()-1.0) # account for header line
#............................................................................ ## Working with Key/Value RDDs #............................................................................ #create a Key Value RDD of auto Brand and Horsepower cylData = autoData.map( lambda x: ( x.split(",")[0], \ x.split(",")[7])) print cylData.count() #198 print '----' print cylData.take(5) #take first 5 key-value pairs print '----' print cylData.keys().collect() #get all keys from key-value pair RDD cylData print '----' #Remove header row header = cylData.first() print header cylHPData= cylData.filter(lambda line: line != header) #get data without header print cylHPData.count() #197 print '----' #Find average HP by Brand #Add a count 1 to each record and then reduce to find totals of HP and counts addOne = cylHPData.mapValues(lambda x: (x, 1)) print addOne.collect() #(u'bmw', (u'182', 1)), (u'mercedes-benz', (u'123', 1))......, (u'bmw', (u'182', 1)), (u'mercedes-benz', (u'184', 1)) ] print '----' brandValues= addOne \ .reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1])) print brandValues.collect() #(u'mercedes-benz', (1170, 8)), (u'mitsubishi', (1353, 13)), (u'saab', (760, 6)), (u'volkswagen', (973, 12)) print '----' #find average by dividing HP total by count total print brandValues.mapValues(lambda x: int(x[0])/int(x[1])). \ collect() #[(u'dodge', 84), (u'mercury', 175), (u'jaguar', 204), (u'alfa-romero', 125), (u'nissan', 102), (u'toyota', 92), (u'plymouth', 86), (u'mazda', 86), (u'subaru', 86), (u'peugot', 99), (u'porsche', 191), (u'isuzu', 84), (u'chevrolet', 62), (u'honda', 80), (u'volvo', 128), (u'bmw', 138), (u'mercedes-benz', 146), (u'mitsubishi', 104), (u'saab', 126), (u'volkswagen', 81), (u'audi', 114)] print '----'
#............................................................................ ## Advanced Spark : Accumulators & Broadcast Variables #............................................................................ #function that splits the line as well as counts sedans and hatchbacks #Speed optimization #Initialize accumulator sedanCount = SpContext.accumulator(0) hatchbackCount = SpContext.accumulator(0) #Set Broadcast variable sedanText=SpContext.broadcast("sedan") hatchbackText=SpContext.broadcast("hatchback") def splitLines(line) : global sedanCount global hatchbackCount #Use broadcast variable to do comparison and set accumulator if sedanText.value in line: sedanCount += 1 if hatchbackText.value in line: hatchbackCount += 1 return line.split(",") #do the map splitData=autoData.map(splitLines) #Make it execute the map (lazy execution) print splitData.count() print '----' print(sedanCount, hatchbackCount) print '----' #............................................................................ ## Advanced Spark : Partitions #............................................................................ print collData.getNumPartitions() #Specify no. of partitions. collData = SpContext.parallelize([3,5,4,7,4], 4) print collData.cache() print collData.count() print '----' print collData.getNumPartitions() print '----' #localhost:4040 shows the current spark instance
RDD
- Transformation - map
- Action - reduce, take
- lazy loads
Persist()
- percentofDelayedFlights = flightsParsed.filter(lambda x:x.delay>0).count() / flaot(flightsParsed.count())
- Here we are using flightsParsed twice, instead we can force RDD to be materialized once using persist()
- flightsParsed.persist() # Cached, huge performance saving
- Any RDD you use over and over again, you need to persist() to improve performance
- flightsParsed.unpersist() # UnCache, free-up memory
aggregate()
- totalDistance = flightsParsed.filter(lambda x:x.distance).reduce(lambda x,y:x+y)
- avgDistance = totalDistance / flightsParsed.count()
- Instead of two actions reduce & count, we can go with one using aggregate
Freq Dist by Hours
- flightsParsed.filter(lambda x: int(x.dep_delay/60)).countByValue()