paolo@bimodesign.com | +34 608 61 64 10

NoSQL

        

Spark - Join with lists

The Join function doesn't work with list, so to make a merge of them we will create two RDDs for each list. Before starting to read this post, you need to read this where there is the quickly introduction to some PySpark functions.
We have these two lists:

(A) List of uniqueHostPerDay

[(1, 2582), (3, 3222), (4, 4190), (5, 2502), (6, 2537), (7, 4106), (8, 4406), (9, 4317), (10, 4523), (11, 4346), (12, 2864), (13, 2650), (14, 4454), (15, 4214), (16, 4340), (17, 4385), (18, 4168), (19, 2550), (20, 2560), (21, 4134), (22, 4456)]

(B) List of totalHostPerDay

[(1, 33996), (3, 41387), (4, 59554), (5, 31888), (6, 32416), (7, 57355), (8, 60142), (9, 60457), (10, 61245), (11, 61242), (12, 38070), (13, 36480), (14, 59873), (15, 58845), (16, 56651), (17, 58980), (18, 56244), (19, 32092), (20, 32963), (21, 55539), (22, 57758)]

In order to return the Join between (A) and (B):

[(4, (59554, 4190)), (6, (32416, 2537)), (8, (60142, 4406)), (10, (61245, 4523)), (12, (38070, 2864)), (14, (59873, 4454)), (16, (56651, 4340)), (18, (56244, 4168)), (20, (32963, 2560)), (22, (57758, 4456)), (1, (33996, 2582)), (3, (41387, 3222)), (5, (31888, 2502)), (7, (57355, 4106)), (9, (60457, 4317)), (11, (61242, 4346)), (13, (36480, 2650)), (15, (58845, 4214)), (17, (58980, 4385)), (19, (32092, 2550)), (21, (55539, 4134))]

the code will be this you can notice that I need to create two RDDs

dayAndHostTuple = (access_logs
          .map(lambda log: ((log.date_time.day,log.host),1)))

groupedByDay = (dayAndHostTuple
                  .map(lambda (x,y) : x)
                  .map(lambda (x,y) : (x,1))
                  .reduceByKey((lambda a, b : a + b)))

sortedByDay = groupedByDay.sortByKey().collect()

#Convert list to RDD
totByDay = sc.parallelize(list(sortedByDay))
uniqueByDay = sc.parallelize(list(dailyHostsList))

avgDailyReqPerHost = (totByDay
                     .join(uniqueByDay)
                     .map(lambda (k,v): (k,v[0]/v[1]))
                     .sortByKey()).cache()

avgDailyReqPerHostList = avgDailyReqPerHost.take(30)
print 'Average number of daily requests per Hosts is %s' % avgDailyReqPerHostList