-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathConcurrentHash.h
More file actions
241 lines (183 loc) · 7.04 KB
/
ConcurrentHash.h
File metadata and controls
241 lines (183 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
//
// Created by pengyibo on 2019-08-25.
//
#ifndef QUEUESTORE_CONCURRENTHASH_H
#define QUEUESTORE_CONCURRENTHASH_H
#include <unordered_map>
#include <utility>
#include <string>
#include <strhash.h>
#include "MutexLock.h"
/*
* 一共分成1024个桶,每个桶维护 key 到 Queue的关系的映射
* Queue本身,包括元信息和Buffer(Buffer应该是可以替换的,因此可以考虑单独实现一个PagePool来解决(分桶,每个桶可以用原子变量来实现))
* Queue本身是直接分配的,算是new出来的,这里可以考虑用static的方式先分配好
*
* */
namespace CONCURRENT{
static constexpr uint32_t SegNum = 4096;
uint32_t MurmurHash2 ( const void * key, int len,uint32_t seed);
class HashTable
{
class HashNode
{
public:
char *key = nullptr; // 指向实际的key,这里可以是string
void *value = nullptr; // 指向实际的数据
HashNode* next = nullptr; // 这个hash_node上面的链表
size_t hash; // 计算出来的hash值
uint16_t len;
HashNode(char* key_, void* value_, size_t hash_, HashNode* next_, int len_): key(key_), value(value_), next(next_), hash(hash_), len(len_){}
HashNode(): key(nullptr), value(nullptr), next(nullptr), hash(0){}
};
class MemBlock{
public:
HashNode node;
char buf[32];
};
public:
HashNode* allocNode(const char* key, void* value, int len, uint32_t hash, HashNode* next){
// 尝试了静态分配缓存进行优化,但是几乎没有效果(特别是访问顺序是按照分配的方式来的,如果在每一个table内部进行分配,实际上会导致cache不命中)
char* key_cp = new char[len + 1];
memcpy(key_cp, key, len);
return new HashNode(key_cp, value, hash, next, len);
}
void* find(const char* key, int len, uint32_t prefixHash){
//uint32_t hash = MurmurHash2(key, len, prefixHash);
uint32_t hash = prefixHash;
HashNode* node = table[(hash >> 12) & 255];
while (node != nullptr){
if(node->hash == hash && node->len == len && memcmp(node->key, key, len) == 0){
return node->value;
}else{
node = node->next;
}
}
return nullptr;
}
/*
* 这里首先通过get来判断,如果没有相同key才会调用
* 因此这里直接采取头部插入
* */
void put(const char* key, int len, void* value, uint32_t prefixHash){
count++;
//uint32_t hash = MurmurHash2(key, len, prefixHash);
uint32_t hash = prefixHash;
uint8_t idx = (hash >> 12) & 255;
HashNode* node = table[idx];
if(node == nullptr){
table[idx] = allocNode(key, value, len, hash, nullptr);
}else{
node->next = allocNode(key, value, len, hash, node->next);
}
}
size_t size() const {
return count;
}
size_t count = 0;
HashNode* table[256]; // 4096个segment, 因此大约256足够装下了
};
template <class Value>
class Segment{
public:
using Key = std::string ;
using KeyRef = const Key&;
//using Hash = std::unordered_map<Key , Value>;
using Hash = HashTable;
using Str = std::string;
SpinLock lock; // 这里用一个最简单的自旋锁来处理即可,因为冲突的机率比较小
Hash map;
Value get(KeyRef key, uint32_t hash){
auto value = map.find(key.c_str(), key.length(), hash);
return static_cast<Value>(value);
}
// 暂时不考虑并发写的情况,如果要考虑,那么
void put( KeyRef key, Value val, uint32_t hash){
//map[key] = val;
map.put(key.c_str(), key.length(), val, hash);
}
};
template <class Value>
class ConcurrentHash {
public:
ConcurrentHash():segments(SegNum){}
ConcurrentHash(const ConcurrentHash& other)=delete;
Value* get(const std::string& key){
uint32_t h = MurmurHash2(key.c_str(), key.length(), 0xA5E36FD4);
uint32_t idx = h % SegNum;
auto& seg = segments[idx];
MutexLockGuard guard(seg.lock);
return seg.get(key, h);
}
// 如果存在就不写入,否则就执行callable,得到返回结果
// 因为有可能callable执行开销比较大或者有副作用,所以要保证一定是double check之后再执行
// 或者外面进行synchronize来保证
// 获取拷贝callabe有一点点开销,这个时候也可以考虑直接传值进来
Value* putIfAbsent(const std::string& key, std::function<Value*(void)> callable){
uint32_t h = MurmurHash2(key.c_str(), key.length(), 0xA5E36FD4);
uint32_t idx = h % SegNum;
auto& seg = segments[idx];
MutexLockGuard guard(seg.lock);
{
Value* ret = nullptr;
if((ret = seg.get(key, h)) == nullptr){
ret = callable();
seg.put(key, ret, h);
return ret;
}
return ret;
}
}
void put(const std::string& key, Value* v){
uint32_t h = MurmurHash2(key.c_str(), key.length(), 0xA5E36FD4);
uint32_t idx = h % SegNum;
auto& seg = segments[idx];
Value* ret = v;
MutexLockGuard guard(seg.lock);
seg.put(key, ret, h);
}
// Data
std::vector<Segment<Value*>> segments;
const uint32_t feed = 0xA5E36FD4;
};
/*
* 直接拷贝了一个MurmurHash2函数过来
* */
inline uint32_t MurmurHash2 ( const void * key, int len, uint32_t seed)
{
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
static constexpr uint32_t m = 0x5bd1e995;
static constexpr int r = 24;
// Initialize the hash to a 'random' value
uint32_t h = seed ^ len;
// Mix 4 bytes at a time into the hash
const unsigned char * data = (const unsigned char *)key;
while(len >= 4)
{
uint32_t k = *(uint32_t*)data;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
// Handle the last few bytes of the input array
switch(len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
// Do a few final mixes of the hash to ensure the last few
// bytes are well-incorporated.
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
};
#endif //QUEUESTORE_CONCURRENTHASH_H