Skip to content

Commit edc0772

Browse files
committed
feat: distinct timeouts for forward, backward, and control flows
1 parent f97628d commit edc0772

File tree

6 files changed

+104
-58
lines changed

6 files changed

+104
-58
lines changed

cmd/serve.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,24 @@ func CommandServe(cfg *config.Config) *cli.Command {
333333
Value: "ws://127.0.0.1:11111",
334334
},
335335

336+
&cli.DurationFlag{ // --flashblocks-backward-timeout
337+
Category: strings.ToUpper(categoryFlashblocks),
338+
Destination: &cfg.Flashblocks.BackwardTimeout,
339+
EnvVars: []string{envPrefix + strings.ToUpper(categoryFlashblocks) + "_BACKWARD_TIMEOUT"},
340+
Name: categoryFlashblocks + "-backward-timeout",
341+
Usage: "max `duration` for flashblocks frontend reads and backend writes (0s means no timeout)",
342+
Value: 0,
343+
},
344+
345+
&cli.DurationFlag{ // --flashblocks-control-timeout
346+
Category: strings.ToUpper(categoryFlashblocks),
347+
Destination: &cfg.Flashblocks.BackwardTimeout,
348+
EnvVars: []string{envPrefix + strings.ToUpper(categoryFlashblocks) + "_CONTROL_TIMEOUT"},
349+
Name: categoryFlashblocks + "-control-timeout",
350+
Usage: "max `duration` for control websocket messages reads and writes (0s means no timeout)",
351+
Value: time.Second,
352+
},
353+
336354
&cli.BoolFlag{ // --flashblocks-enabled
337355
Category: strings.ToUpper(categoryFlashblocks),
338356
Destination: &cfg.Flashblocks.Enabled,
@@ -342,6 +360,15 @@ func CommandServe(cfg *config.Config) *cli.Command {
342360
Value: false,
343361
},
344362

363+
&cli.DurationFlag{ // --flashblocks-forward-timeout
364+
Category: strings.ToUpper(categoryFlashblocks),
365+
Destination: &cfg.Flashblocks.ForwardTimeout,
366+
EnvVars: []string{envPrefix + strings.ToUpper(categoryFlashblocks) + "_FORWARD_TIMEOUT"},
367+
Name: categoryFlashblocks + "-forward-timeout",
368+
Usage: "max `duration` for flashblocks backend reads and frontend writes (0s means no timeout)",
369+
Value: 5 * time.Second,
370+
},
371+
345372
&cli.StringFlag{ // --flashblocks-healthcheck
346373
Category: strings.ToUpper(categoryFlashblocks),
347374
DefaultText: "disabled",
@@ -415,15 +442,6 @@ func CommandServe(cfg *config.Config) *cli.Command {
415442
Value: 16,
416443
},
417444

418-
&cli.DurationFlag{ // --flashblocks-timeout
419-
Category: strings.ToUpper(categoryFlashblocks),
420-
Destination: &cfg.Flashblocks.Timeout,
421-
EnvVars: []string{envPrefix + strings.ToUpper(categoryFlashblocks) + "_TIMEOUT"},
422-
Name: categoryFlashblocks + "-timeout",
423-
Usage: "max `duration` for flashblocks websocket reads or writes",
424-
Value: 5 * time.Second,
425-
},
426-
427445
&cli.StringFlag{ // --flashblocks-tls-crt
428446
Category: strings.ToUpper(categoryFlashblocks),
429447
Destination: &cfg.Flashblocks.TLSCertificate,

config/websocket_proxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import (
1515

1616
type WebsocketProxy struct {
1717
BackendURL string `yaml:"backend_url"`
18+
BackwardTimeout time.Duration `yaml:"backward_timeout"`
19+
ControlTimeout time.Duration `yaml:"control_timeout"`
1820
Enabled bool `yaml:"enabled"`
21+
ForwardTimeout time.Duration `yaml:"forward_timeout"`
1922
Healthcheck *Healthcheck `yaml:"healthcheck"`
2023
ListenAddress string `yaml:"listen_address"`
2124
LogMessages bool `yaml:"log_messages"`
2225
LogMessagesMaxSize int `yaml:"log_messages_max_size"`
2326
ReadBufferSize int `yaml:"read_buffer_size_mb"`
24-
Timeout time.Duration `yaml:"backend_timeout"`
2527
TLSCertificate string `yaml:"tls_crt"`
2628
TLSKey string `yaml:"tls_key"`
2729
WriteBufferSize int `yaml:"write_buffer_size_mb"`

proxy/websocket.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/flashbots/bproxy/config"
1414
"github.com/flashbots/bproxy/logutils"
1515
"github.com/flashbots/bproxy/metrics"
16+
"github.com/flashbots/bproxy/utils"
1617

1718
"github.com/fasthttp/websocket"
1819
"github.com/valyala/fasthttp"
@@ -361,15 +362,15 @@ func (p *Websocket) closeWebsocket(conn *websocket.Conn, reason error) error {
361362
if reason == nil {
362363
return errors.Join(
363364
conn.WriteControl(
364-
websocket.CloseMessage, nil, time.Now().Add(p.cfg.proxy.Timeout),
365+
websocket.CloseMessage, nil, utils.Deadline(p.cfg.proxy.ControlTimeout),
365366
),
366367
conn.Close(),
367368
)
368369
}
369370

370371
return errors.Join(
371372
conn.WriteControl(
372-
websocket.CloseMessage, []byte(reason.Error()), time.Now().Add(p.cfg.proxy.Timeout),
373+
websocket.CloseMessage, []byte(reason.Error()), utils.Deadline(p.cfg.proxy.ControlTimeout),
373374
),
374375
conn.Close(),
375376
)

proxy/websocket_pump.go

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ func (w *websocketPump) run() error {
5454
w.frontend.SetCloseHandler(w.pumpCloseMessages(w.frontend, w.backend, "f->b"))
5555
w.backend.SetCloseHandler(w.pumpCloseMessages(w.backend, w.frontend, "b->f"))
5656

57-
go w.pumpMessages(w.backend, w.frontend, "b->f", done, failure)
58-
go w.pumpMessages(w.frontend, w.backend, "f->b", done, failure)
57+
go w.pumpMessages(w.backend, w.frontend, w.cfg.proxy.ForwardTimeout, "b->f", done, failure)
58+
go w.pumpMessages(w.frontend, w.backend, w.cfg.proxy.BackwardTimeout, "f->b", done, failure)
5959

6060
w.active.Store(true)
6161

@@ -99,6 +99,7 @@ func (w *websocketPump) stop() error {
9999

100100
func (w *websocketPump) pumpMessages(
101101
from, to *websocket.Conn,
102+
timeout time.Duration,
102103
direction string,
103104
done chan struct{},
104105
failure chan error,
@@ -116,54 +117,66 @@ loop:
116117
return
117118

118119
default:
119-
if err := from.SetReadDeadline(time.Now().Add(w.cfg.proxy.Timeout)); err != nil {
120-
failure <- err
121-
continue loop
122-
}
123-
msgType, bytes, err := from.ReadMessage()
124-
if err != nil {
125-
failure <- err
126-
continue loop
120+
var (
121+
msgType int
122+
bytes []byte
123+
err error
124+
)
125+
126+
{ // read
127+
if err := from.SetReadDeadline(utils.Deadline(timeout)); err != nil {
128+
failure <- err
129+
continue loop
130+
}
131+
msgType, bytes, err = from.ReadMessage()
132+
if err != nil {
133+
failure <- err
134+
continue loop
135+
}
127136
}
128137

129138
ts := time.Now()
130139

131-
if err := to.SetWriteDeadline(time.Now().Add(w.cfg.proxy.Timeout)); err != nil {
132-
failure <- err
133-
continue loop
134-
}
135-
err = to.WriteMessage(msgType, bytes)
136-
if err != nil {
137-
failure <- err
138-
continue loop
140+
{ // write
141+
if err := to.SetWriteDeadline(utils.Deadline(timeout)); err != nil {
142+
failure <- err
143+
continue loop
144+
}
145+
err = to.WriteMessage(msgType, bytes)
146+
if err != nil {
147+
failure <- err
148+
continue loop
149+
}
139150
}
140151

141-
loggedFields := make([]zap.Field, 0, 6)
142-
loggedFields = append(loggedFields,
143-
zap.Time("ts_message_received", ts),
144-
zap.Int("message_type", msgType),
145-
zap.Int("message_size", len(bytes)),
146-
)
147-
148-
if w.cfg.proxy.LogMessages && len(bytes) <= w.cfg.proxy.LogMessagesMaxSize {
149-
var jsonMessage interface{}
150-
if err := json.Unmarshal(bytes, &jsonMessage); err == nil {
151-
loggedFields = append(loggedFields,
152-
zap.Any("json_message", jsonMessage),
153-
)
154-
} else {
155-
loggedFields = append(loggedFields,
156-
zap.NamedError("error_unmarshal", err),
157-
zap.String("websocket_message", utils.Str(bytes)),
158-
)
152+
{ // emit logs and metrics
153+
loggedFields := make([]zap.Field, 0, 6)
154+
loggedFields = append(loggedFields,
155+
zap.Time("ts_message_received", ts),
156+
zap.Int("message_type", msgType),
157+
zap.Int("message_size", len(bytes)),
158+
)
159+
160+
if w.cfg.proxy.LogMessages && len(bytes) <= w.cfg.proxy.LogMessagesMaxSize {
161+
var jsonMessage interface{}
162+
if err := json.Unmarshal(bytes, &jsonMessage); err == nil {
163+
loggedFields = append(loggedFields,
164+
zap.Any("json_message", jsonMessage),
165+
)
166+
} else {
167+
loggedFields = append(loggedFields,
168+
zap.NamedError("error_unmarshal", err),
169+
zap.String("websocket_message", utils.Str(bytes)),
170+
)
171+
}
159172
}
160-
}
161173

162-
metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes(
163-
attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(w.cfg.name)},
164-
attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)},
165-
))
166-
l.Info("Proxied message", loggedFields...)
174+
metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes(
175+
attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(w.cfg.name)},
176+
attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)},
177+
))
178+
l.Info("Proxied message", loggedFields...)
179+
}
167180
}
168181
}
169182
}
@@ -182,7 +195,7 @@ func (w *websocketPump) pumpPings(
182195
zap.String("message", hex.EncodeToString([]byte(message))),
183196
)
184197
return to.WriteControl(
185-
websocket.PingMessage, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
198+
websocket.PingMessage, []byte(message), utils.Deadline(w.cfg.proxy.ControlTimeout),
186199
)
187200
}
188201
}
@@ -201,7 +214,7 @@ func (w *websocketPump) pumpPongs(
201214
zap.String("message", hex.EncodeToString([]byte(message))),
202215
)
203216
return to.WriteControl(
204-
websocket.PongMessage, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
217+
websocket.PongMessage, []byte(message), utils.Deadline(w.cfg.proxy.ControlTimeout),
205218
)
206219
}
207220
}
@@ -221,7 +234,7 @@ func (w *websocketPump) pumpCloseMessages(
221234
zap.String("message", hex.EncodeToString([]byte(message))),
222235
)
223236
return to.WriteControl(
224-
code, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
237+
code, []byte(message), utils.Deadline(w.cfg.proxy.ControlTimeout),
225238
)
226239
}
227240
}

readme.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ OPTIONS:
5151
FLASHBLOCKS
5252
5353
--flashblocks-backend url url of flashblocks backend (default: "ws://127.0.0.1:11111") [$BPROXY_FLASHBLOCKS_BACKEND]
54+
--flashblocks-backward-timeout duration max duration for flashblocks frontend reads and backend writes (0s means no timeout) (default: 0s) [$BPROXY_FLASHBLOCKS_BACKWARD_TIMEOUT]
55+
--flashblocks-control-timeout duration max duration for control websocket messages reads and writes (0s means no timeout) (default: 1s) [$BPROXY_FLASHBLOCKS_CONTROL_TIMEOUT]
5456
--flashblocks-enabled enable flashblocks proxy (default: false) [$BPROXY_FLASHBLOCKS_ENABLED]
57+
--flashblocks-forward-timeout duration max duration for flashblocks backend reads and frontend writes (0s means no timeout) (default: 5s) [$BPROXY_FLASHBLOCKS_FORWARD_TIMEOUT]
5558
--flashblocks-healthcheck url url of flashblocks backend healthcheck endpoint (default: disabled) [$BPROXY_FLASHBLOCKS_HEALTHCHECK]
5659
--flashblocks-healthcheck-interval interval interval between consecutive flashblocks backend healthchecks (default: 1s) [$BPROXY_FLASHBLOCKS_HEALTHCHECK_INTERVAL]
5760
--flashblocks-healthcheck-threshold-healthy count count of consecutive successful healthchecks to consider flashblocks backend to be healthy (default: 2) [$BPROXY_FLASHBLOCKS_HEALTHCHECK_THRESHOLD_HEALTHY]
@@ -60,7 +63,6 @@ OPTIONS:
6063
--flashblocks-log-messages whether to log flashblocks messages (default: false) [$BPROXY_FLASHBLOCKS_LOG_MESSAGES]
6164
--flashblocks-log-messages-max-size size do not log flashblocks messages larger than size (default: 4096) [$BPROXY_FLASHBLOCKS_LOG_MESSAGES_MAX_SIZE]
6265
--flashblocks-read-buffer-size megabytes flashblocks read buffer size in megabytes (messages from client) (default: 16) [$BPROXY_FLASHBLOCKS_READ_BUFFER_SIZE]
63-
--flashblocks-timeout duration max duration for flashblocks websocket reads or writes (default: 5s) [$BPROXY_FLASHBLOCKS_TIMEOUT]
6466
--flashblocks-tls-crt path path to flashblocks tls certificate (default: uses plain-text http) [$BPROXY_FLASHBLOCKS_TLS_CRT]
6567
--flashblocks-tls-key path path to flashblocks tls key (default: uses plain-text http) [$BPROXY_FLASHBLOCKS_TLS_KEY]
6668
--flashblocks-write-buffer-size megabytes flashblocks write buffer size in megabytes (messages from backend) (default: 16) [$BPROXY_FLASHBLOCKS_WRITE_BUFFER_SIZE]

utils/deadline.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package utils
2+
3+
import "time"
4+
5+
func Deadline(timeout time.Duration) time.Time {
6+
if timeout == 0 {
7+
return time.Time{}
8+
}
9+
return time.Now().Add(timeout)
10+
}

0 commit comments

Comments
 (0)