-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjoin_reducer.py
More file actions
32 lines (26 loc) · 896 Bytes
/
join_reducer.py
File metadata and controls
32 lines (26 loc) · 896 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/python3
import sys
import re
import ast
if __name__ == "__main__":
# since reducer takes every (key, row) we are guaranteed that every row has the same value
# join_col = argv[1]
d = {}
# glob_key = ""
for line in sys.stdin:
line = line.split('\t')
key, dr = line[0], line[1]
data_marker, row = ast.literal_eval(dr)
if d.get(key, {}) == {}:
d[key]= {}
d[key]["data1_list"] = []
d[key]["data2_list"] = []
row = ",".join(row)
if data_marker == "d1":
d[key]["data1_list"].append(row)
elif data_marker == "d2":
d[key]["data2_list"].append(row)
for k in d.keys():
for row1 in d[k]["data1_list"]:
for row2 in d[k]["data2_list"]:
print('%s' % (str(row1.strip()) + "," + str(row2.strip())))