Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions examples/zinx_custom_frame_decoder/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Custom frame decoder example
*
* This example demonstrates how to use custom frame decoder for handling
* packet splitting without length field (e.g., using '\r' as delimiter)
*
* Usage:
* Run the server, then use telnet to test:
* telnet 127.0.0.1 7777
* Type some data ending with \r (press Enter in telnet sends \r\n)
*/
package main

import (
"fmt"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

// CustomFrameDecoder implements IFrameDecoder interface
// Splits packets by '\r' delimiter
type CustomFrameDecoder struct{}

// Decode splits the data by '\r' delimiter
func (d *CustomFrameDecoder) Decode(buff []byte) [][]byte {
var result [][]byte
start := 0

for i := 0; i < len(buff); i++ {
if buff[i] == '\r' {
// Found delimiter, create a new frame
if i > start {
frame := make([]byte, i-start)
copy(frame, buff[start:i])
result = append(result, frame)
}
start = i + 1
}
}

// Handle remaining data (without delimiter at the end)
if start < len(buff) {
frame := make([]byte, len(buff)-start)
copy(frame, buff[start:])
result = append(result, frame)
}

return result
}

func main() {
// Create a custom frame decoder
customDecoder := &CustomFrameDecoder{}

// Create server with custom frame decoder
server := znet.NewServer("CustomFrameDecoderServer")
server.SetFrameDecoder(customDecoder)

// Add route
server.AddRouter(1, &Router{})

zlog.Ins().InfoF("Starting server with custom frame decoder on :7777...")
server.Serve()
}

// Router handle the request
type Router struct {
znet.BaseRouter
}

func (r *Router) Handle(request ziface.IRequest) {
zlog.Ins().Infof("Received data: %s", string(request.GetData()))
fmt.Printf("Received data: %s\n", string(request.GetData()))
}
4 changes: 3 additions & 1 deletion examples/zinx_decoder/bili/router/bili0x10router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ type Data0x10Router struct {
}

func (this *Data0x10Router) Handle(request ziface.IRequest) {
zlog.Ins().DebugF("Data0x10Router Handle %s \n", hex.EncodeToString(request.GetMessage().GetData()))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("Data0x10Router Handle %s \n", hex.EncodeToString(request.GetMessage().GetData()))
}
_response := request.GetResponse()
if _response != nil {
switch _response.(type) {
Expand Down
4 changes: 3 additions & 1 deletion examples/zinx_decoder/router/htlvcrcbusinessrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ func (this *HtlvCrcBusinessRouter) Handle(request ziface.IRequest) {

//MsgID
msgID := request.GetMessage().GetMsgID()
zlog.Ins().DebugF("Call HtlvCrcBusinessRouter Handle %d %s\n", msgID, hex.EncodeToString(request.GetMessage().GetData()))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("Call HtlvCrcBusinessRouter Handle %d %s\n", msgID, hex.EncodeToString(request.GetMessage().GetData()))
}

resp := request.GetResponse()
if resp == nil {
Expand Down
4 changes: 3 additions & 1 deletion zdecoder/htlvcrcdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (hcd *HtlvCrcDecoder) decode(data []byte) *HtlvCrcDecoder {

// CRC
if !CheckCRC(data[:datasize-2], htlvData.Crc) {
zlog.Ins().DebugF("crc check error %s %s\n", hex.EncodeToString(data), hex.EncodeToString(htlvData.Crc))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("crc check error %s %s\n", hex.EncodeToString(data), hex.EncodeToString(htlvData.Crc))
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions ziface/iclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type IClient interface {
// SetDecoder Set the decoder for this Client 设置解码器
SetDecoder(IDecoder)

// SetFrameDecoder Set the custom frame decoder for handling custom packet splitting
// (设置自定义帧解码器,用于处理自定义粘包)
SetFrameDecoder(IFrameDecoder)
// GetFrameDecoder Get the custom frame decoder
// (获取自定义帧解码器)
GetFrameDecoder() IFrameDecoder

// AddInterceptor Add an interceptor for this Client 添加拦截器
AddInterceptor(IInterceptor)

Expand Down
1 change: 1 addition & 0 deletions ziface/ilogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type ILogger interface {
InfoF(format string, v ...interface{})
ErrorF(format string, v ...interface{})
DebugF(format string, v ...interface{})
IsDebugEnabled() bool

//with context
InfoFX(ctx context.Context, format string, v ...interface{})
Expand Down
7 changes: 7 additions & 0 deletions ziface/iserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type IServer interface {
SetDecoder(IDecoder)
AddInterceptor(IInterceptor)

// Set the custom frame decoder for handling custom packet splitting
// (设置自定义帧解码器,用于处理自定义粘包)
SetFrameDecoder(IFrameDecoder)
// Get the custom frame decoder
// (获取自定义帧解码器)
GetFrameDecoder() IFrameDecoder

// Add WebSocket authentication method
// (添加websocket认证方法)
SetWebsocketAuth(func(r *http.Request) error)
Expand Down
4 changes: 4 additions & 0 deletions zlog/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (log *zinxDefaultLog) DebugF(format string, v ...interface{}) {
StdZinxLog.Debugf(format, v...)
}

func (log *zinxDefaultLog) IsDebugEnabled() bool {
return StdZinxLog.IsDebugEnabled()
}

func (log *zinxDefaultLog) InfoFX(ctx context.Context, format string, v ...interface{}) {
fmt.Println(ctx)
StdZinxLog.Infof(format, v...)
Expand Down
4 changes: 4 additions & 0 deletions zlog/logger_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func (log *ZinxLoggerCore) Debugf(format string, v ...interface{}) {
_ = log.OutPut(LogDebug, fmt.Sprintf(format, v...))
}

func (log *ZinxLoggerCore) IsDebugEnabled() bool {
return !log.verifyLogIsolation(LogDebug)
}

func (log *ZinxLoggerCore) Debug(v ...interface{}) {
if log.verifyLogIsolation(LogDebug) {
return
Expand Down
4 changes: 4 additions & 0 deletions zlog/stdzlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func Debugf(format string, v ...interface{}) {
StdZinxLog.Debugf(format, v...)
}

func IsDebugEnabled() bool {
return StdZinxLog.IsDebugEnabled()
}

func Debug(v ...interface{}) {
StdZinxLog.Debug(v...)
}
Expand Down
16 changes: 16 additions & 0 deletions znet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Client struct {
dialer *websocket.Dialer
// Error channel
errChan chan error
// Custom frame decoder for handling custom packet splitting
// (自定义帧解码器,用于处理自定义粘包)
frameDecoder ziface.IFrameDecoder
}

func NewClient(ip string, port int, opts ...ClientOption) ziface.IClient {
Expand Down Expand Up @@ -347,13 +350,26 @@ func (c *Client) AddInterceptor(interceptor ziface.IInterceptor) {
func (c *Client) SetDecoder(decoder ziface.IDecoder) {
c.decoder = decoder
}

func (c *Client) GetLengthField() *ziface.LengthField {
if c.decoder != nil {
return c.decoder.GetLengthField()
}
return nil
}

// SetFrameDecoder sets the custom frame decoder for handling custom packet splitting
// (设置自定义帧解码器,用于处理自定义粘包)
func (c *Client) SetFrameDecoder(frameDecoder ziface.IFrameDecoder) {
c.frameDecoder = frameDecoder
}

// GetFrameDecoder gets the custom frame decoder
// (获取自定义帧解码器)
func (c *Client) GetFrameDecoder() ziface.IFrameDecoder {
return c.frameDecoder
}

func (c *Client) GetErrChan() <-chan error {
return c.errChan
}
Expand Down
14 changes: 11 additions & 3 deletions znet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
}

lengthField := server.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if server.GetFrameDecoder() != nil {
c.frameDecoder = server.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -179,7 +182,10 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection {
}

lengthField := client.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if client.GetFrameDecoder() != nil {
c.frameDecoder = client.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -255,7 +261,9 @@ func (c *Connection) StartReader() {
zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
return
}
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
}

// If normal data is read from the peer, update the heartbeat detection Active state
// (正常读取到对端数据,更新心跳检测Active状态)
Expand Down
14 changes: 11 additions & 3 deletions znet/kcp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func newKcpServerConn(server ziface.IServer, conn *kcp.UDPSession, connID uint64
}

lengthField := server.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if server.GetFrameDecoder() != nil {
c.frameDecoder = server.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -171,7 +174,10 @@ func newKcpClientConn(client ziface.IClient, conn *kcp.UDPSession) ziface.IConne
}

lengthField := client.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if client.GetFrameDecoder() != nil {
c.frameDecoder = client.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -236,7 +242,9 @@ func (c *KcpConnection) StartReader() {
zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
return
}
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
}

// If normal data is read from the peer, update the heartbeat detection Active state
// (正常读取到对端数据,更新心跳检测Active状态)
Expand Down
4 changes: 3 additions & 1 deletion znet/msghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
// zlog.Ins().DebugF("Add ConnID=%d request msgID=%d to workerID=%d", request.GetConnection().GetConnID(), request.GetMsgID(), workerID)
// Send the request message to the task queue
mh.TaskQueue[workerID] <- request
zlog.Ins().DebugF("SendMsgToTaskQueue-->%s", hex.EncodeToString(request.GetData()))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("SendMsgToTaskQueue-->%s", hex.EncodeToString(request.GetData()))
}
}

// doFuncHandler handles functional requests (执行函数式请求)
Expand Down
16 changes: 16 additions & 0 deletions znet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type Server struct {

// connection id
cID uint64

// Custom frame decoder for handling custom packet splitting
// (自定义帧解码器,用于处理自定义粘包)
frameDecoder ziface.IFrameDecoder
}

type KcpConfig struct {
Expand Down Expand Up @@ -604,6 +608,18 @@ func (s *Server) GetLengthField() *ziface.LengthField {
return nil
}

// SetFrameDecoder sets the custom frame decoder for handling custom packet splitting
// (设置自定义帧解码器,用于处理自定义粘包)
func (s *Server) SetFrameDecoder(frameDecoder ziface.IFrameDecoder) {
s.frameDecoder = frameDecoder
}

// GetFrameDecoder gets the custom frame decoder
// (获取自定义帧解码器)
func (s *Server) GetFrameDecoder() ziface.IFrameDecoder {
return s.frameDecoder
}

func (s *Server) AddInterceptor(interceptor ziface.IInterceptor) {
s.msgHandler.AddInterceptor(interceptor)
}
Expand Down
18 changes: 14 additions & 4 deletions znet/ws_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func newWebsocketConn(server ziface.IServer, conn *websocket.Conn, connID uint64
}

lengthField := server.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if server.GetFrameDecoder() != nil {
c.frameDecoder = server.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -167,7 +170,10 @@ func newWsClientConn(client ziface.IClient, conn *websocket.Conn) ziface.IConnec
}

lengthField := client.GetLengthField()
if lengthField != nil {
// First check if there's a custom frame decoder
if client.GetFrameDecoder() != nil {
c.frameDecoder = client.GetFrameDecoder()
} else if lengthField != nil {
c.frameDecoder = zinterceptor.NewFrameDecoder(*lengthField)
}

Expand Down Expand Up @@ -235,7 +241,9 @@ func (c *WsConnection) StartReader() {
zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err.Error())
return
}
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
}

// Update the Active status of heartbeat detection normally after reading data from the peer.
// (正常读取到对端数据,更新心跳检测Active状态)
Expand All @@ -253,7 +261,9 @@ func (c *WsConnection) StartReader() {
continue
}
for _, bytes := range bufArrays {
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(bytes))
if zlog.Ins().IsDebugEnabled() {
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(bytes))
}
msg := zpack.NewMessage(uint32(len(bytes)), bytes)
// Get the Request data requested by the current client.
// (得到当前客户端请求的Request数据)
Expand Down
Loading