Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ type scope struct {
}

func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string, sessionTimeout int) *scope {
h := c.getHost()
var h *topology.Node
if sessionId != "" {
h = c.getHostSticky(sessionId)
} else {
h = c.getHost()
}
var localAddr string
if addr, ok := req.Context().Value(http.LocalAddrContextKey).(net.Addr); ok {
Expand Down Expand Up @@ -874,27 +876,29 @@ func (c *cluster) getReplica() *replica {
return r
}

// getReplicaSticky returns replica by stickiness from the cluster.
//
// Always returns non-nil.
func (c *cluster) getReplicaSticky(sessionId string) *replica {
idx := atomic.AddUint32(&c.nextReplicaIdx, 1)
n := uint32(len(c.replicas))
if n == 1 {
return c.replicas[0]
}

idx %= n
// handling sticky session
idx := (hash(sessionId) >> 16) % n
r := c.replicas[idx]

for i := uint32(1); i < n; i++ {
// handling sticky session
sessionId := hash(sessionId)
tmpIdx := (sessionId) % n
// TODO: In principle, sticky session must always proxy to the fixed replica continuously,
// no matter whether the replica is active or not.
for i := range n {
tmpIdx := (idx + i) % n
tmpRSticky := c.replicas[tmpIdx]
log.Debugf("Sticky replica candidate is: %s", tmpRSticky.name)
if !tmpRSticky.isActive() {
log.Debugf("Sticky session replica has been picked up, but it is not available")
log.Errorf("Sticky replica candidate %q for session_id %q is inactive", tmpRSticky.name, sessionId)
continue
}
log.Debugf("Sticky session replica is: %s, session_id: %d, replica_idx: %d, max replicas in pool: %d", tmpRSticky.name, sessionId, tmpIdx, n)
log.Debugf("Sticky replica for session_id %q: %q", sessionId, tmpRSticky.name)
return tmpRSticky
}
// The returned replica may be inactive. This is OK,
Expand All @@ -907,27 +911,25 @@ func (c *cluster) getReplicaSticky(sessionId string) *replica {
//
// Always returns non-nil.
func (r *replica) getHostSticky(sessionId string) *topology.Node {
idx := atomic.AddUint32(&r.nextHostIdx, 1)
n := uint32(len(r.hosts))
if n == 1 {
return r.hosts[0]
}

idx %= n
// handling sticky session
idx := (hash(sessionId) & 0xFFFF) % n
h := r.hosts[idx]

// Scan all the hosts for the least loaded host.
for i := uint32(1); i < n; i++ {
// handling sticky session
sessionId := hash(sessionId)
tmpIdx := (sessionId) % n
// TODO: In principle, sticky session must always proxy to the fixed host continuously,
// no matter whether the host is active or not.
for i := range n {
tmpIdx := (idx + i) % n
tmpHSticky := r.hosts[tmpIdx]
log.Debugf("Sticky server candidate is: %s", tmpHSticky)
if !tmpHSticky.IsActive() {
log.Debugf("Sticky session server has been picked up, but it is not available")
log.Errorf("Sticky host candidate %q for session_id %q is inactive", tmpHSticky, sessionId)
continue
}
log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky, sessionId, tmpIdx, n)
log.Debugf("Sticky host for session_id %q: %q", sessionId, tmpHSticky)
return tmpHSticky
}

Expand Down
8 changes: 4 additions & 4 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,10 @@ func TestDecorateRequest(t *testing.T) {

func TestGetHostSticky(t *testing.T) {
exceptedSessionHostMap := map[string]string{
"0": "127.0.0.22",
"0": "127.0.0.66",
"1": "127.0.0.33",
"2": "127.0.0.44",
"3": "127.0.0.55",
"3": "127.0.0.11",
}
c := testGetCluster()
for i := 0; i < 10000; i++ {
Expand All @@ -415,10 +415,10 @@ func TestIncQueued(t *testing.T) {
cu := testGetClusterUser()
c := testGetCluster()
expectedSessionHostMap := map[string]string{
"0": "127.0.0.22",
"0": "127.0.0.66",
"1": "127.0.0.33",
"2": "127.0.0.44",
"3": "127.0.0.55",
"3": "127.0.0.11",
}
if err := testConcurrentQuery(c, u, cu, 10000, expectedSessionHostMap); err != nil {
t.Fatalf("incQueue test err: %s", err)
Expand Down