forked from urnetwork/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_framer.go
More file actions
174 lines (143 loc) · 4.97 KB
/
message_framer.go
File metadata and controls
174 lines (143 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package connect
import (
"encoding/binary"
"fmt"
"io"
"math"
// "time"
// "github.com/urnetwork/connect"
// "github.com/golang/glog"
)
// a message framer that optimizes memory copies to reduce cpu+memory usage
// on a typical connection, writing into the connection buffer will trigger a packet send
// and incur some fixed overhead
// to avoid small packets and excessive write calls, this framer approach breaks
// messages above a threshold into exactly two writes, resulting in an effective
// halving of the packet size on the wire
// the benefit of this approach is the framing can be done with zero additional memory allocation
// and a small constant memory copies before handing the message to the connection
// versus allocating and copying into a new framed message buffer,
// this approach is ~2x more cpu+memory efficient to send framed messages on a tcp/udp connection
// the framer read/write op is called billions of times in a typical user hour
type FramerSettings struct {
MaxMessageLen int
SplitMinimumLen int
}
func DefaultFramerSettings() *FramerSettings {
return &FramerSettings{
MaxMessageLen: 2048,
SplitMinimumLen: 256,
}
}
// Read/ReadPacket and Write must be called from a single goroutine each
type Framer struct {
readBuffer []byte
writeBuffer []byte
settings *FramerSettings
}
func NewFramerWithDefaults() *Framer {
return NewFramer(DefaultFramerSettings())
}
func NewFramer(settings *FramerSettings) *Framer {
framer := &Framer{
readBuffer: make([]byte, settings.MaxMessageLen),
writeBuffer: make([]byte, settings.MaxMessageLen),
settings: settings,
}
if len(framer.writeBuffer) < settings.SplitMinimumLen+4 {
panic(fmt.Errorf("SplitMinimumLen must be less than %d", len(framer.writeBuffer)-4))
}
return framer
}
func (self *Framer) Read(r io.Reader) ([]byte, error) {
h := self.readBuffer[:]
if _, err := io.ReadFull(r, h[0:4]); err != nil {
return nil, err
}
messageLen := int(binary.LittleEndian.Uint16(h[0:2]))
// splitIndex := int(binary.LittleEndian.Uint16(h[2:4]))
if self.settings.MaxMessageLen < messageLen {
// glog.Infof("READ MAX\n")
return nil, fmt.Errorf("Max message len exceeded (%d<%d)", self.settings.MaxMessageLen, messageLen)
}
// message := make([]byte, messageLen)
message := MessagePoolGet(messageLen)
if _, err := io.ReadFull(r, message); err != nil {
return nil, err
}
return message, nil
}
// use this version if the reader dequeues an entire packet per read
func (self *Framer) ReadPacket(r io.Reader) ([]byte, error) {
h := self.readBuffer[:]
n, err := r.Read(h)
if err != nil {
return nil, err
}
messageLen := int(binary.LittleEndian.Uint16(h[0:2]))
splitIndex := int(binary.LittleEndian.Uint16(h[2:4]))
if self.settings.MaxMessageLen < messageLen {
// glog.Infof("READ MAX\n")
return nil, fmt.Errorf("Max message len exceeded (%d<%d)", self.settings.MaxMessageLen, messageLen)
}
// message := make([]byte, messageLen)
message := MessagePoolGet(messageLen)
if splitIndex < 16 {
// no split
// note we could use 4 bit for additional signaling if needed
copy(message[0:min(n-4, messageLen)], h[4:min(n, messageLen+4)])
if n-4 < messageLen {
if _, err := io.ReadFull(r, message[n-4:messageLen]); err != nil {
return nil, err
}
}
} else {
copy(message[0:min(n-4, splitIndex)], h[4:min(n, splitIndex+4)])
if n-4 < splitIndex {
if _, err := io.ReadFull(r, message[n:splitIndex]); err != nil {
return nil, err
}
}
if splitIndex < n-4 {
copy(message[splitIndex:n-4], h[splitIndex+4:n])
}
if _, err := io.ReadFull(r, message[n-4:messageLen]); err != nil {
return nil, err
}
}
return message, nil
}
// we assume a packet writer will fragment the message internally as needed
func (self *Framer) Write(w io.Writer, message []byte) error {
messageLen := len(message)
if self.settings.MaxMessageLen < messageLen {
// glog.Infof("WRITE MAX\n")
return fmt.Errorf("Max message len exceeded (%d<%d)", self.settings.MaxMessageLen, messageLen)
}
if math.MaxUint16 < messageLen {
return fmt.Errorf("Max possible message len exceeded (%d<%d)", math.MaxUint16, messageLen)
}
if messageLen < max(16, self.settings.SplitMinimumLen) {
messageWithHeader := self.writeBuffer[:]
binary.LittleEndian.PutUint16(messageWithHeader[0:2], uint16(messageLen))
binary.LittleEndian.PutUint16(messageWithHeader[2:4], uint16(0))
copy(messageWithHeader[4:4+messageLen], message)
if _, err := w.Write(messageWithHeader[0 : messageLen+4]); err != nil {
return err
}
} else {
// use half size packets and avoid large memory copy by writing the message in two parts
splitIndex := messageLen / 2
h := self.writeBuffer[:]
binary.LittleEndian.PutUint16(h[0:2], uint16(messageLen))
binary.LittleEndian.PutUint16(h[2:4], uint16(splitIndex))
copy(h[4:4+splitIndex], message[0:splitIndex])
if _, err := w.Write(h[0 : 4+splitIndex]); err != nil {
return err
}
if _, err := w.Write(message[splitIndex:messageLen]); err != nil {
return err
}
}
return nil
}