paolo@bimodesign.com | +34 608 61 64 10

NoSQL

        

Spark - Map, Reduce and Filter

In this first post about Spark, I'll show some examples using the PySpark Map function join with Reduce, ReduceByKey and Filter, in order to produce the new RDDs and finally the output.
First of all some definitions:

Map: Transforms a series of elements by applying a fuction individually to each elements in the series.
Input: One parameter function that returns a new value
Return: a new RDD by applying a function to each element of this RDD.
Filter: Apply a function individually to each elements in the series depending a condition.
Input: One parameter function that returns a new value
Return: a new RDD containing only the elements that satisfy a condition.
Reduce: Apply a function on a couple of elements in the series.
Input: two parameters function that combines the two parameter
Return: Return a new value.

This link explains clearly the PySpark API using graphics.
And here more information about lambda function.

Here some examples:

How calculate the average, the min and Max:
We can compute the statistics by applying a map to the access_logs RDD. The lambda function we want for the map is to extract the content_size field from the RDD. The map produces a new RDD containing only the content_sizes (one element for each Row object in the access_logs RDD). To compute the minimum and maximum statistics, we can use min() and max() functions on the new RDD. We can compute the average statistic by using the reduce function with a lambda function that sums the two inputs, which represent two elements from the new RDD that are being reduced together. The result of the reduce() is the total content size from the log and it is to be divided by the number of requests as determined using the count() function on the new RDD.

content_sizes = access_logs.map(lambda log: log.content_size).cache()
print 'Content Size Avg: %i, Min: %i, Max: %s' % (
    content_sizes.reduce(lambda a, b : a + b) / content_sizes.count(),
    content_sizes.min(),
    content_sizes.max())


First example of using Map, ReduceByKey and Filter:

hostCountPairTuple = access_logs.map(lambda log: (log.host, 1)) // First we create a new RDD by using a lambda function to extract the host field from the 
                                                                // access_logs RDD using a pair tuple consisting of the host and 1 which will let us count how many records 
                                                                // were created by a particular host's request.

hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b) // Using the new RDD, we perform a reduceByKey function with a lambda function that adds the two values.
hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10)  // We then filter the result based on the count of accesses by each host (the second element of each pair) being greater than 10.
hostsPick20 = (hostMoreThan10   
               .map(lambda s: s[0]) //   we extract the host name by performing a map with a lambda function that returns the first element of each pair.
               .take(20))  // we extract 20 elements from the resulting RDD


Second example of using Map, ReduceByKey and Filter:
What are the top ten endpoints which did not have return code 200? Create a sorted list containing top ten endpoints and the number of times that they were accessed with non-200 return code. Think about the steps that you need to perform to determine which endpoints did not have a 200 return code, how you will uniquely count those endpoints, and sort the list.

not200 = (access_logs
          .map(lambda log: (log.endpoint, log.response_code))
          .filter(lambda s: s[1] != 200)
          .map(lambda log: (log[0], 1))
          .reduceByKey(lambda a, b : a + b))

#print not200
#endpointCountPairTuple = not200.distinct()
#endpointSum = not200.count()
endpointSum = not200
#print endpointSum

topTenErrURLs = not200.takeOrdered(10, key=lambda x : -x[1])
print 'Top Ten failed URLs: %s' % topTenErrURLs

Example using Map and ReduceByKey:
How many unique hosts are there in the entire log?

hosts = (access_logs
        .map(lambda log: (log.host,1))
        .reduceByKey(lambda a, b : a + b))

#uniqueHosts = hosts.reduceByKey(lambda a, b : a + b)
uniqueHosts = hosts
uniqueHostCount = uniqueHosts.count()
print 'Unique hosts: %d' % uniqueHostCount

More complex example using Map:
If I have a structure like this

[('b000jz4hqo', ['clickart', '950', '000', 'premier', 'image', 'pack', 'dvd', 'rom', 'broderbund']), ('b0006zf55o', ['ca', 'international', 'arcserve', 'lap', 'desktop', 'oem', '30pk', 'oem', 'arcserve', 'backup', 'v11', '1', 'win', '30u', 'laptops', 'desktops', 'computer', 'associates']), ('b00004tkvy', ['noah', 'ark', 'activity', 'center', 'jewel', 'case', 'ages', '3', '8', 'victory', 'multimedia']), 
....etc....]

where "b000jz4hqo" is the ID and the list of values into the "[]" are the tokens, which is the ID with the largest number of tokens?
I'll create the function "findBiggestRecord" and this code.

def findBiggestRecord(vendorRDD):
    Args:
        vendorRDD (RDD of (recordId, tokens)): input Pair Tuple of record ID and tokens
    Returns:
        list: a list of 1 Pair Tuple of record ID and tokens
    """
    mappa = vendorRDD.map(lambda (id,list): (id,len(list))).takeOrdered(1, lambda s: -1 * s[1]) // Main part of code
    
    return mappa

biggestRecordAmazon = findBiggestRecord(amazonRecToToken)

The result will be like this

[('b000o24l3q', 1547)]

But if I want that the value of biggestRecordAmazon returns to use in this sentences (note the array...)

print 'The Amazon record with ID "%s" has the most tokens (%s)' % (biggestRecordAmazon[0][0],
                                                                   len(biggestRecordAmazon[0][1]))

Then I have to change the "main part of code" to

mappa = vendorRDD.map(lambda (id,list): (id,list,len(list))).takeOrdered(1, lambda s: -1 * s[2])

where I added a new columns with all the tokens and I calculated the order on the third.