-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMyMapReduce.java
More file actions
236 lines (198 loc) · 7.8 KB
/
MyMapReduce.java
File metadata and controls
236 lines (198 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import java.io.*;
import java.util.*;
public class MyMapReduce
{
// stvaranje novih listi
List buckets = new ArrayList();
List intermediateresults = new ArrayList();
List values = new ArrayList();
public void init()
{
// u listu values dodaj imena dokumenata
// --> nadodati vlastiti kod
values.add("doc1.txt");
values.add("doc2.txt");
System.out.println("**STEP 1 START**-> Running Conversion into Buckets**");
System.out.println();
List b = step1ConvertIntoBuckets(values, 2);
System.out.println("************STEP 1 COMPLETE*************");
System.out.println();
System.out.println();
System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all Buckets");
System.out.println();
List res = step2RunMapFunctionForAllBuckets(b);
System.out.println("************STEP 2 COMPLETE*************");
System.out.println();
System.out.println();
System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results");
System.out.println();
step3RunReduceFunctionForAllBuckets(res);
System.out.println("************STEP 3 COMPLETE*************");
}
// Funkcija: dijeli imena primljenih dokumenata u buckete
// Prima: listu u kojoj se nalaze imena dokumenata koje je potrebno podijeliti u buckete, te broj bucketa (2)
// Vraca: ispunjene buckete
public List step1ConvertIntoBuckets(List list, int numberofbuckets)
{
int n = list.size(); // 2 (jer imamo dva dokumenta)
int m = n / numberofbuckets; // 2/2 = 1 --> u svaki bucket trebamo staviti po jedan dokument
int rem = n % numberofbuckets; // 2%2 = 0 --> nema ostatka, pa ce nam svi dokumenti stati u predvidjene buckete (tj. ne treba nam dodatni bucket)
int count = 0;
System.out.println("BUCKETS");
for (int j = 1; j <= numberofbuckets; j++)
{
List temp = new ArrayList();
for (int i = 1; i <= m; i++)
{
// dohvati ime prvog dokumenta i spremi ga u temp
// --> nadodati vlastiti kod
temp.add((String)values.get(count));
count++;
}
// u buckets ubaci ime prvog dokumenta, pa drugog
// --> nadodati vlastiti kod
buckets.add(temp);
temp = new ArrayList();
}
if (rem != 0) // ako ne mozemo podatke jednoliko podijeliti u buckete, ostatak stavi u novi bucket
{
List temp = new ArrayList();
for (int i = 1; i <= rem; i++)
{
temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
}
System.out.println();
System.out.println(buckets);
System.out.println();
return buckets;
}
// Funkcija: Broji koliko ima odredjenih rijeci u svakom bucketu (npr. rijec FESB se u 1. bucketu pojavljuje 12 puta, rijec SPLIT 5, itd.)
// Prima: listu bucketa (tj. imena dvaju dokumenata koja su podijeljena u 2 bucketa)
// Vraca: hash tablice za svaki dokument/bucket (u njima pise koliko se puta pojavljuje koja rijec)
public List step2RunMapFunctionForAllBuckets(List list)
{
for (int i = 0; i < list.size(); i++) // za svaki bucket
{
List elementList = (ArrayList)list.get(i);
// kreiranje nove instance klase StartThread i pozivanje start metode
// --> nadodati vlastiti kod
new StartThread(elementList).start();
}
try
{
Thread.currentThread().sleep(1000); // cekamo dok se sve ne sinkronizira (za svaki slucaj)
}
catch(Exception e)
{
}
return intermediateresults; // tu su hash tablice za svaki dokument/bucket
}
// ne prikazuj upozorenja, programer je zelio ovakav kod
@SuppressWarnings("unchecked")
// Funkcija: kombinira rezultate hash funkcija za sve buckete (ukratko, zbraja koliko se puta odredjene rijeci ponavljaju u svim bucketima)
// Prima: hash tablice za svaki dokument/bucket
// Vraca: nista
public void step3RunReduceFunctionForAllBuckets(List list)
{
int sum = 0;
Hashtable <String,Integer > wordMap = new Hashtable <String,Integer>(); // stvori hash tablicu u kojoj ce biti ukupni rezultati
wordMap.putAll((Hashtable <String,Integer >) list.get(0)); // u wordMap stavi prvu hash tablicu jer se ona nema s cime duplicirati (sve rijeci u njoj su jedinstvene)
for (int i = 1; i < list.size(); i++) // za sve ostale buckete
{
Hashtable <String,Integer > newMap = (Hashtable <String,Integer >) list.get(i); // dohvati drugu hash tablicu
for (String key : newMap.keySet()) // za svaki key iz keyset-a kojeg dohvatimo iz druge hash tablice
{
if (wordMap.containsKey(key)) // ako ukupna hash tablica sadrzi taj key
{
int val = wordMap.get(key).intValue() + newMap.get(key).intValue(); // zbroji broj ponavljanja te rijeci
wordMap.put(key, val); // update-aj ukupnu hash tablicu
}
else // ako ukupna hash tablica ne sadrzi taj key
{
wordMap.put(key, newMap.get(key)); // dodaj novu rijec u ukupnu hash tablicu
}
}
System.out.println();
System.out.println("Konacni rezultati: ");
for (String key : wordMap.keySet())
{
System.out.println(key + ":" + wordMap.get(key));
}
}
System.out.println();
}
// Funkcija: kreira hash tablice za svaki bucket
// u svakoj hash tablici nalaze se rijeci iz bucketa, te broj puta koliko se ponavljaju u tom bucketu
// Update se globalna lista intermediateresults u kojoj su hash tablice za sve buckete
class StartThread extends Thread
{
private List tempList = new ArrayList();
int brojr; // broj rijeci
public StartThread(List list) // dobije ime (prvog, pa drugog) dokumenta; ovo se pozove kod kreiranja instance klase
{
tempList = list;
System.out.println("Thread " + tempList);
}
public void run() // metoda start poziva run funkciju
{
brojr = 0;
// Hashtable (Hashtable <String,Integer>) mapira keys u values
// --> nadodati vlastiti kod (kreirajte hash tablicu imena wordMap)
Hashtable <String,Integer> wordMap = new Hashtable <String,Integer>();
for (int i = 0; i < tempList.size(); i++) // za sve elemente jednog bucketa (petlja nam treba za slucaj da u bucketu imamo vise dokumenata)
{
// ucitavanje dokumenata
File inFile = new File((String)tempList.get(i));
try
{
BufferedReader br = new BufferedReader(new FileReader(inFile));
// ready() metoda nam kaze jesu li podaci dostupni (npr. je li ih server preko soketa vec poslao)
while (br.ready())
{
// definicija: StringTokenizer(String str, String delimiters)
// delimiteri se ne bi trebali vracati kao tokeni - njih moramo zanemariti prilikom parsiranja teksta
// --> nadodati vlastiti kod ( StringTokenizer line = new StringTokenizer(--> nadopisite sto dolazi unutar ovih zagrada <--); )
StringTokenizer line = new StringTokenizer (br.readLine(), " ,.;:()");
while (line.hasMoreTokens())
{
String temp = line.nextToken(); // ucitaj slijedeci toket (tj. rijec)
brojr++;
// ako se u hash tablici vec nalazi ova rijec
if (wordMap.containsKey(temp))
{
int val = wordMap.get(temp).intValue();
val++;
wordMap.put(temp, val);
// --> nadodati vlastiti kod. Smjernice:
// dohvati tu rijec iz hash tablice, te broj puta koliko se je ponovila
// uvecaj broj puta koliko se je ta rijec ponovila
// update-aj podatke u hash tablici
}
else // rijec ne postoji u hash tablici, pa je trebamo nadodati
{
wordMap.put(temp, 1);
// --> nadodati vlastiti kod
}
}
}
}
catch (Exception e)
{
// u slucaju greske, ispisi slijed funkcija koje su rezultirale pozivanjem funkcije koja je generirala gresku
e.printStackTrace();
}
String str = (String)tempList.get(i);
// kriticni odsjecak
synchronized(this)
{
// u listu intermediateresults dodaj hash tablicu za trenutni bucket
// --> nadodati vlastiti kod
intermediateresults.add(wordMap);
}
}
}
}
}