-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathflush_cache_controller.py
More file actions
229 lines (174 loc) · 10 KB
/
flush_cache_controller.py
File metadata and controls
229 lines (174 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import utilities
import multiprocessing
import os
import socket
from timeit import default_timer as timer
class Cache_Controller(multiprocessing.Process):
def __init__(self, givenShare,
fixed_clusters_keywords,
prob_clusters,
range_cache_size,
streaming_dir, streaming_rate=10,
padding_mode="nh",
host="localhost", port=8089,
f_window=20):
super(Cache_Controller, self).__init__()
self.number_clusters = len(fixed_clusters_keywords)
self.clusters_keywords = fixed_clusters_keywords
self.cluster_probabilities = prob_clusters
# initThreshold for clusters
self.cacheThresholdInit(range_cache_size)
self.streaming_dir = streaming_dir
self.streaming_rate = streaming_rate
self.padding_mode = padding_mode
self.host = host
self.port = port
self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.cached_data_clusters = givenShare
self.cluster_padding_time = dict() # recording the padding time of each cluster
self.f_reports = list([])
self.f_window = f_window
def initCacheByClustersProp(self, clusters_keywords_props, range_cache_size):
if len(clusters_keywords_props) == 0:
print("Invalid cache clusters")
return None
# parse cluster probs and keywords
self.number_clusters = len(clusters_keywords_props)
# init the required keywords of each cluster
self.clusters_keywords = [[] for _ in range(self.number_clusters)]
# init prob of each cluster
self.cluster_probabilities = [0.0] * self.number_clusters
for cluster_index in range(self.number_clusters):
cluster = clusters_keywords_props[cluster_index]
total_prob = 0.0
for keyword_prob in cluster:
self.clusters_keywords[cluster_index].append(keyword_prob[0])
total_prob += keyword_prob[1]
self.cluster_probabilities[cluster_index] = total_prob
# initThreshold for clusters
self.cacheThresholdInit(range_cache_size)
def cacheThresholdInit(self, range_cache_size):
# initThreshold for clusters
self.clusters_threshold = [0.0] * self.number_clusters
m1 = min(self.cluster_probabilities)
range1 = max(self.cluster_probabilities) - m1
# normalise to [0,1]
new_array = [(item - m1) / range1 for item in self.cluster_probabilities]
# normalise to the scale of range_cache_size
m2 = min(range_cache_size)
range2 = max(range_cache_size) - m2
self.clusters_threshold = [int((item * range2) + m2) for item in new_array]
def getCacheSizes(self):
return self.clusters_threshold
def exportClusters(self, datadir, cluster_keyword_file, cluster_prob_file):
utilities.dump_data_to_file(self.clusters_keywords, datadir, cluster_keyword_file)
utilities.dump_data_to_file(self.cluster_probabilities, datadir, cluster_prob_file)
def search_local_cache(self, keyword):
for cluster in self.cached_data_clusters:
if keyword in cluster:
return cluster[keyword]
return None
# cache controller will read data from the given dir ,every time grabs 10 files
# this function perform in process to add data in to the cache
# notify the Padding Controller via queuing process
def run(self):
try:
print(">Activated: Cache Controller " + str((self.host, self.port)))
self.clientSocket.connect((self.host, self.port))
arr = os.listdir(self.streaming_dir)
arr = [int(x) for x in arr]
self._initTimeCluster()
print(">Initiate time cluster")
# if it is non-persistent- select the top two clusters
if self.padding_mode[0] == "n":
topClusterCapacities = sorted(self.clusters_threshold, reverse=True)[:2]
self.limited_indices = [self.clusters_threshold.index(cap) for cap in topClusterCapacities ]
self.limited_capacities = [cap * 0.8 for cap in topClusterCapacities ]
self.cached_data_clusters.setClusterCap(self.limited_indices, self.limited_capacities)
print("Capped cluster indices " + str(self.limited_indices))
# read the data from source folder
for i in range(1, len(arr), self.streaming_rate):
# extract file sequence
file_sequence = []
if i + self.streaming_rate < len(arr):
file_sequence = [str(j) for j in range(i, i + self.streaming_rate)]
else:
file_sequence = [str(j) for j in range(i, len(arr))]
# select keyword/document pairs from these files
cluster_map_pairs = []
for fileId in file_sequence:
temp_pairs = utilities.retrieve_keywords_from_file(self.streaming_dir, fileId)
temp_cluster_map = self._keyword_cluster_map(temp_pairs)
cluster_map_pairs += temp_cluster_map
# update cache clusters with these pairs
self.cached_data_clusters.updateClusterEntries(cluster_map_pairs)
# check against padding mode, either "n" or "p"
cluster_indexes = []
if self.padding_mode[0] == "n":
cluster_indexes = self._scanClusterByCapacity()
else:
cluster_indexes = self._scanClusterByKeywords()
# notify Padding Controller regarding the clusters satisfying the padding constraint
for cluster_index in cluster_indexes:
self._broadcastClusterIndex(cluster_index)
# record the time when this cluster is padded and transfer to server
self.cluster_padding_time[cluster_index] = timer()
if self.padding_mode[0] == "p":
self.eligible_pCluster[cluster_index] = 1
# add additional for flushing clusters
for cluster_index in self.limited_indices:#range(self.number_clusters):
if (timer() - self.cluster_padding_time[cluster_index]) >= self.f_window:
print("Flush for non persistent cluster index " + str(cluster_index))
#log information of this cluster
self.f_reports.append((timer(), cluster_index))
self._broadcastClusterIndex(cluster_index)
self.cluster_padding_time[cluster_index] = timer()
except:
# print("Exception Cache Controller")
pass
finally:
if self.f_mode:
print("Flushing times (time,cluster_index) : " + str(self.f_reports))
self.clientSocket.close()
def _broadcastClusterIndex(self, cluster_index):
# print ("Cluster index sent " + str(cluster_index))
if cluster_index == 0:
cluster_index = 99
bindex = cluster_index.to_bytes(cluster_index.bit_length(), byteorder='little')
self.clientSocket.send(bindex)
# halts until response from Padding Controller
# print("Cache controller is waiting")
self.clientSocket.recv(1)
def _keyword_cluster_map(self, temp_pairs):
temp_cluster_map = []
for (keyword, fileId) in temp_pairs:
for cluster_index in range(len(self.clusters_keywords)):
if keyword in self.clusters_keywords[cluster_index]:
temp_cluster_map.append((keyword, fileId, cluster_index))
return temp_cluster_map
def _initTimeCluster(self):
for cluster_index in range(self.number_clusters):
self.cluster_padding_time[cluster_index] = timer()
def _scanClusterByCapacity(self):
eligible_clusters = []
cur_cluster_sizes = self.cached_data_clusters.getClusterSizeAll()
for cluster_index in range(self.number_clusters):
if cur_cluster_sizes[cluster_index] >= self.clusters_threshold[cluster_index]:
eligible_clusters.append(cluster_index)
return eligible_clusters
def _scanClusterByKeywords(self):
ready_clusters = []
#scan by either keywords, or if it excess and it is eligible
cur_cluster_keys = self.cached_data_clusters.getClusterKeyNum()
for cluster_index in range(self.number_clusters):
if cur_cluster_keys[cluster_index] == len(self.clusters_keywords[cluster_index]):
ready_clusters.append(cluster_index)
else:
if self.eligible_pCluster[cluster_index] == 1:
#check current capacity
cur_cluster_size = self.cached_data_clusters.getClusterSizeByIndex(cluster_index)
if cur_cluster_size >= self.clusters_threshold[cluster_index]:
#mark cluster + 100 so that Padding Controller could add dummy keywords
ready_clusters.append(cluster_index + 100)
print("Catch the cluster " + str(cluster_index))
return ready_clusters