forked from mierl/ZHT
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathZHTClient.cpp
More file actions
157 lines (127 loc) · 4.49 KB
/
ZHTClient.cpp
File metadata and controls
157 lines (127 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <string>
#include <cstring>
#include <iostream>
#include <sstream>
#include <fstream>
#include "zht_util.h"
#include "meta.pb.h"
class ZHTClient {
public:
int REPLICATION_TYPE; //serverside or client side. -1:error
int NUM_REPLICAS; //-1:error
int protocolType; //1:1TCP; 2:UDP; 3.... Reserved. -1:error
vector<struct HostEntity> memberList;
ZHTClient();
int initialize(string configFilePath, string memberListFilePath);
struct HostEntity str2Host(string str);
int insert(string key, string value); //following functions should only know string, address where to can be calculated.
int sendPackage(string pkg);
int get(string key, string &returnValue);
int remove(string key);
};
ZHTClient::ZHTClient() { // default all invalid value, so the client must be initialized to set the variables.
//Since constructor can't return anything, we must have an initialization function that can return possible error message.
this->NUM_REPLICAS = -1;
this->REPLICATION_TYPE = -1;
this->protocolType = -1;
}
int ZHTClient::initialize(string configFilePath, string memberListFilePath) {
//read cfg file
this->memberList = getMembership(memberListFilePath);
FILE *fp;
char line[100], *key, *svalue;
int ivalue;
fp = fopen(configFilePath.c_str(), "r");
if (fp == NULL) {
cout << "Error opening the config file." << endl;
return -1;
}
while (fgets(line, 100, fp) != NULL) {
key = strtok(line, "=");
svalue = strtok(NULL, "=");
ivalue = atoi(svalue);
if ((strcmp(key, "REPLICATION_TYPE")) == 0) {
this->REPLICATION_TYPE = ivalue;
} //other config options follow this way(if).
else if ((strcmp(key, "NUM_REPLICAS")) == 0) {
this->NUM_REPLICAS = ivalue + 1; //note: +1 is must
} else {
cout << "Config file is not correct." << endl;
return -2;
}
}
return 0;
}
//transfer a key to a host index where it should go
struct HostEntity ZHTClient::str2Host(string str) {
Package pkg;
pkg.ParseFromString(str);
int index = myhash(pkg.virtualpath().c_str(), this->memberList.size());
struct HostEntity host = this->memberList.at(index);
return host;
}
//send a plain string to destination, receive return state.
int ZHTClient::insert(string key, string value) {
Package package, package_ret;
package.set_virtualpath(key); //as key
package.set_isdir(true);
package.set_replicano(5); //orginal--Note: never let it be nagative!!!
package.set_operation(3); // 3 for insert, 1 for look up, 2 for remove
package.set_realfullpath(value);
string pkg = package.SerializeAsString();
return sendPackage(pkg);
}
int ZHTClient::sendPackage(string pkg) {
int sock = -1;
struct HostEntity dest = this->str2Host(pkg);
int ret = simpleSend(pkg, dest, sock);
d3_closeConnection(sock);
return ret;
}
int ZHTClient::get(string key, string &returnValue) {
Package package, package_ret;
package.set_virtualpath(key); //as key
package.set_isdir(false); //this is crucial: true for not breaking the string.
package.set_replicano(5); //orginal--Note: never let it be nagative!!!
package.set_operation(1); // 3 for insert, 1 for look up, 2 for remove
string msg = package.SerializeAsString();
int sock = -1;
struct HostEntity dest = this->str2Host(msg);
int ret = simpleSend(msg, dest, sock);
char buff[MAX_MSG_SIZE]; //MAX_MSG_SIZE
memset(buff, 0, sizeof(buff));
//this only work for TCP. UDP need to make a new one so accept returns from server.
if (ret == msg.length()) {
int rcv_size = -1;
if (TRANS_PROTOCOL == USE_TCP) {
rcv_size = d3_recv_data(sock, buff, MAX_MSG_SIZE, 0); //MAX_MSG_SIZE
} else if (TRANS_PROTOCOL == USE_UDP) {
srand(getpid() + clock());
sockaddr_in tmp_sockaddr;
memset(&tmp_sockaddr, 0, sizeof(sockaddr_in));
//receive lookup result
rcv_size = d3_svr_recv(sock, buff, MAX_MSG_SIZE, 0, &tmp_sockaddr);
}
if (rcv_size < 0) {
cout << "Lookup received error." << endl;
return rcv_size;
} else {
returnValue.assign(buff);
}
}
d3_closeConnection(sock);
return ret;
}
int ZHTClient::remove(string key) {
int sock = -1;
Package package, package_ret;
package.set_virtualpath(randomString(50)); //as key
package.set_isdir(false); //this is crucial: true for not breaking the string.
package.set_replicano(5); //orginal--Note: never let it be nagative!!!
package.set_operation(2); // 3 for insert, 1 for look up, 2 for remove
string msg = package.SerializeAsString();
struct HostEntity dest = this->str2Host(msg);
int ret = simpleSend(msg, dest, sock);
d3_closeConnection(sock);
return ret;
}