From 5ea1af7fb18ebcf55bdd54ced8ff21d839846dfb Mon Sep 17 00:00:00 2001 From: Oleg Borodin Date: Thu, 18 May 2023 13:34:54 +0200 Subject: [PATCH] disable default log timestamp --- exec_test.go | 573 ++++++++++++++++++++++++++------------------------- logger.go | 28 ++- server.go | 538 +++++++++++++++++++++++------------------------ 3 files changed, 582 insertions(+), 557 deletions(-) diff --git a/exec_test.go b/exec_test.go index 3cf32a7..00c49c5 100644 --- a/exec_test.go +++ b/exec_test.go @@ -5,26 +5,26 @@ package dsrpc import ( - "bytes" - "context" - "encoding/json" - "errors" - "io" - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/require" + "bytes" + "context" + "encoding/json" + "errors" + "io" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" ) const HelloMethod string = "hello" type HelloParams struct { - Message string `json:"message" msgpack:"message"` + Message string `json:"message" msgpack:"message"` } type HelloResult struct { - Message string `json:"message" msgpack:"message"` + Message string `json:"message" msgpack:"message"` } const SaveMethod string = "save" @@ -38,333 +38,342 @@ type LoadParams HelloParams type LoadResult HelloResult func TestLocalExec(t *testing.T) { - var err error - params := HelloParams{} - params.Message = "hello server!" - result := HelloResult{} + var err error + params := HelloParams{} + params.Message = "hello server!" + result := HelloResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) + auth := CreateAuth([]byte("qwert"), []byte("12345")) - err = LocalExec(HelloMethod, ¶ms, &result, auth, helloHandler) - require.NoError(t, err) + err = LocalExec(HelloMethod, ¶ms, &result, auth, helloHandler) + require.NoError(t, err) - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) } func TestLocalSave(t *testing.T) { - var err error + var err error - params := SaveParams{} - params.Message = "save data!" - result := SaveResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) + params := SaveParams{} + params.Message = "save data!" + result := SaveResult{} + auth := CreateAuth([]byte("qwert"), []byte("12345")) - var binSize int64 = 16 - rand.Seed(time.Now().UnixNano()) - binBytes := make([]byte, binSize) - rand.Read(binBytes) + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) - reader := bytes.NewReader(binBytes) + reader := bytes.NewReader(binBytes) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - err = LocalPut(ctx, SaveMethod, reader, binSize, ¶ms, &result, auth, saveHandler) - require.NoError(t, err) + err = LocalPut(ctx, SaveMethod, reader, binSize, ¶ms, &result, auth, saveHandler) + require.NoError(t, err) - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) } func TestLocalLoad(t *testing.T) { - var err error + var err error - params := LoadParams{} - params.Message = "load data!" - result := LoadResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) + params := LoadParams{} + params.Message = "load data!" + result := LoadResult{} + auth := CreateAuth([]byte("qwert"), []byte("12345")) - binBytes := make([]byte, 0) - writer := bytes.NewBuffer(binBytes) + binBytes := make([]byte, 0) + writer := bytes.NewBuffer(binBytes) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - err = LocalGet(ctx, LoadMethod, writer, ¶ms, &result, auth, loadHandler) - require.NoError(t, err) + err = LocalGet(ctx, LoadMethod, writer, ¶ms, &result, auth, loadHandler) + require.NoError(t, err) - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) - logDebug("bin size:", len(writer.Bytes())) + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) + logDebug("bin size:", len(writer.Bytes())) } func TestNetExec(t *testing.T) { - go testServ(false) - time.Sleep(10 * time.Millisecond) - err := clientHello() + go testServ(false) + time.Sleep(100 * time.Millisecond) + err := clientHello() - require.NoError(t, err) + require.NoError(t, err) } func TestNetSave(t *testing.T) { - go testServ(false) - time.Sleep(10 * time.Millisecond) - err := clientSave() - require.NoError(t, err) + go testServ(false) + time.Sleep(100 * time.Millisecond) + err := clientSave() + require.NoError(t, err) } func TestNetLoad(t *testing.T) { - go testServ(false) - time.Sleep(10 * time.Millisecond) - err := clientLoad() - require.NoError(t, err) + go testServ(false) + time.Sleep(100 * time.Millisecond) + err := clientLoad() + require.NoError(t, err) } func BenchmarkNetPut(b *testing.B) { - go testServ(true) - time.Sleep(10 * time.Millisecond) - clientSave() - - pBench := func(pb *testing.PB) { - for pb.Next() { - clientSave() - } - } - b.SetParallelism(2000) - b.RunParallel(pBench) + go testServ(true) + time.Sleep(1000 * time.Millisecond) + clientSave() + + pBench := func(pb *testing.PB) { + for pb.Next() { + clientSave() + } + } + b.SetParallelism(2000) + b.RunParallel(pBench) } func clientHello() error { - var err error - - params := HelloParams{} - params.Message = "hello server!" - result := HelloResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) - - var binSize int64 = 16 - rand.Seed(time.Now().UnixNano()) - binBytes := make([]byte, binSize) - rand.Read(binBytes) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = Exec(ctx, "127.0.0.1:8081", HelloMethod, ¶ms, &result, auth) - if err != nil { - logError("method err:", err) - return err - } - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) - return err + var err error + + params := HelloParams{} + params.Message = "hello server!" + result := HelloResult{} + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = Exec(ctx, "127.0.0.1:18081", HelloMethod, ¶ms, &result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) + return err } func clientSave() error { - var err error - - params := SaveParams{} - params.Message = "save data!" - result := SaveResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) - - var binSize int64 = 16 - rand.Seed(time.Now().UnixNano()) - binBytes := make([]byte, binSize) - rand.Read(binBytes) - - reader := bytes.NewReader(binBytes) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = Put(ctx, "127.0.0.1:8081", SaveMethod, reader, binSize, ¶ms, &result, auth) - if err != nil { - logError("method err:", err) - return err - } - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) - return err + var err error + + params := SaveParams{} + params.Message = "save data!" + result := SaveResult{} + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + reader := bytes.NewReader(binBytes) + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = Put(ctx, "127.0.0.1:18081", SaveMethod, reader, binSize, ¶ms, &result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) + return err } func clientLoad() error { - var err error - - params := LoadParams{} - params.Message = "load data!" - result := LoadResult{} - auth := CreateAuth([]byte("qwert"), []byte("12345")) - - binBytes := make([]byte, 0) - writer := bytes.NewBuffer(binBytes) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = Get(ctx, "127.0.0.1:8081", LoadMethod, writer, ¶ms, &result, auth) - if err != nil { - logError("method err:", err) - return err - } - resultJson, _ := json.Marshal(result) - logDebug("method result:", string(resultJson)) - logDebug("bin size:", len(writer.Bytes())) - return err + var err error + + params := LoadParams{} + params.Message = "load data!" + result := LoadResult{} + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + binBytes := make([]byte, 0) + writer := bytes.NewBuffer(binBytes) + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = Get(ctx, "127.0.0.1:18081", LoadMethod, writer, ¶ms, &result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJson, _ := json.Marshal(result) + logDebug("method result:", string(resultJson)) + logDebug("bin size:", len(writer.Bytes())) + return err } var testServRun bool = false func testServ(quiet bool) error { - var err error - - if testServRun { - return err - } - testServRun = true - - if quiet { - SetAccessWriter(io.Discard) - SetMessageWriter(io.Discard) - } - serv := NewService() - serv.Handle(HelloMethod, helloHandler) - serv.Handle(SaveMethod, saveHandler) - serv.Handle(LoadMethod, loadHandler) - - serv.PreMiddleware(LogRequest) - serv.PreMiddleware(auth) - - serv.PostMiddleware(LogResponse) - serv.PostMiddleware(LogAccess) - - err = serv.Listen(":8081") - if err != nil { - return err - } - return err + var err error + + if testServRun { + return err + } + testServRun = true + + if quiet { + SetAccessWriter(io.Discard) + SetMessageWriter(io.Discard) + } + + serv := NewService() + serv.Handle(HelloMethod, helloHandler) + serv.Handle(SaveMethod, saveHandler) + serv.Handle(LoadMethod, loadHandler) + + serv.PreMiddleware(LogRequest) + serv.PreMiddleware(auth) + + serv.PostMiddleware(LogResponse) + serv.PostMiddleware(LogAccess) + + err = serv.Listen(":18081") + if err != nil { + return err + } + return err } func auth(content *Content) error { - var err error - reqIdent := content.AuthIdent() - reqSalt := content.AuthSalt() - reqHash := content.AuthHash() - - ident := reqIdent - pass := []byte("12345") - - auth := content.Auth() - logDebug("auth ", string(auth.Json())) - - ok := CheckHash(ident, pass, reqSalt, reqHash) - logDebug("auth ok:", ok) - if !ok { - err = errors.New("auth ident or pass missmatch") - content.SendError(err) - return err - } - return err + var err error + reqIdent := content.AuthIdent() + reqSalt := content.AuthSalt() + reqHash := content.AuthHash() + + ident := reqIdent + pass := []byte("12345") + + auth := content.Auth() + logDebug("auth ", string(auth.Json())) + + ok := CheckHash(ident, pass, reqSalt, reqHash) + logDebug("auth ok:", ok) + if !ok { + err = errors.New("auth ident or pass missmatch") + content.SendError(err) + return err + } + return err } func helloHandler(content *Content) error { - var err error - params := HelloParams{} - - err = content.BindParams(¶ms) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = content.ReadBin(ctx, io.Discard) - if err != nil { - content.SendError(err) - return err - } - - result := HelloResult{} - result.Message = "hello, client!" - - err = content.SendResult(result, 0) - if err != nil { - return err - } - return err + var err error + params := HelloParams{} + + err = content.BindParams(¶ms) + if err != nil { + return err + } + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = content.ReadBin(ctx, io.Discard) + if err != nil { + content.SendError(err) + return err + } + + result := HelloResult{} + result.Message = "hello, client!" + + err = content.SendResult(result, 0) + if err != nil { + return err + } + return err } func saveHandler(content *Content) error { - var err error - params := SaveParams{} - - err = content.BindParams(¶ms) - if err != nil { - return err - } - - bufferBytes := make([]byte, 0, 1024) - binWriter := bytes.NewBuffer(bufferBytes) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = content.ReadBin(ctx, binWriter) - if err != nil { - content.SendError(err) - return err - } - - result := SaveResult{} - result.Message = "saved successfully!" - - err = content.SendResult(result, 0) - if err != nil { - return err - } - return err + var err error + params := SaveParams{} + + err = content.BindParams(¶ms) + if err != nil { + return err + } + + bufferBytes := make([]byte, 0, 1024) + binWriter := bytes.NewBuffer(bufferBytes) + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = content.ReadBin(ctx, binWriter) + if err != nil { + content.SendError(err) + return err + } + + result := SaveResult{} + result.Message = "saved successfully!" + + err = content.SendResult(result, 0) + if err != nil { + return err + } + return err } func loadHandler(content *Content) error { - var err error - params := SaveParams{} - - err = content.BindParams(¶ms) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - err = content.ReadBin(ctx, io.Discard) - if err != nil { - content.SendError(err) - return err - } - - var binSize int64 = 1024 - rand.Seed(time.Now().UnixNano()) - binBytes := make([]byte, binSize) - rand.Read(binBytes) - - binReader := bytes.NewReader(binBytes) - - result := SaveResult{} - result.Message = "load successfully!" - - err = content.SendResult(result, binSize) - if err != nil { - return err - } - - binWriter := content.BinWriter() - _, err = CopyBytes(ctx, binReader, binWriter, binSize) - if err != nil { - return err - } - - return err + var err error + params := SaveParams{} + + err = content.BindParams(¶ms) + if err != nil { + return err + } + + timeout := time.Duration(5 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = content.ReadBin(ctx, io.Discard) + if err != nil { + content.SendError(err) + return err + } + + var binSize int64 = 1024 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + binReader := bytes.NewReader(binBytes) + + result := SaveResult{} + result.Message = "load successfully!" + + err = content.SendResult(result, binSize) + if err != nil { + return err + } + + binWriter := content.BinWriter() + _, err = CopyBytes(ctx, binReader, binWriter, binSize) + if err != nil { + return err + } + + return err } diff --git a/logger.go b/logger.go index a296a08..ac6ea15 100644 --- a/logger.go +++ b/logger.go @@ -13,26 +13,37 @@ import ( "time" ) -var messageWriter io.Writer = os.Stdout -var accessWriter io.Writer = os.Stdout +var ( + messageWriter io.Writer = os.Stdout + accessWriter io.Writer = os.Stdout + logTimestamp bool = false +) + +func getLogStamp() string { + var stamp string + if logTimestamp { + stamp = time.Now().Format(time.RFC3339) + } + return stamp +} func logDebug(messages ...any) { - stamp := time.Now().Format(time.RFC3339) + stamp := getLogStamp() fmt.Fprintln(messageWriter, stamp, "debug", messages) } func logInfo(messages ...any) { - stamp := time.Now().Format(time.RFC3339) + stamp := getLogStamp() fmt.Fprintln(messageWriter, stamp, "info", messages) } func logError(messages ...any) { - stamp := time.Now().Format(time.RFC3339) + stamp := getLogStamp() fmt.Fprintln(messageWriter, stamp, "error", messages) } func logAccess(messages ...any) { - stamp := time.Now().Format(time.RFC3339) + stamp := getLogStamp() fmt.Fprintln(accessWriter, stamp, "access", messages) } @@ -43,3 +54,8 @@ func SetAccessWriter(writer io.Writer) { func SetMessageWriter(writer io.Writer) { messageWriter = writer } + +func EnableLogTimestamp(enable bool) { + logTimestamp = enable +} + diff --git a/server.go b/server.go index 59dbe3c..81e70c5 100644 --- a/server.go +++ b/server.go @@ -5,344 +5,344 @@ package dsrpc import ( - "context" - "crypto/tls" - "errors" - "fmt" - "io" - "net" - "sync" - "time" - - encoder "encoding/json" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "sync" + "time" + + encoder "encoding/json" ) type HandlerFunc = func(*Content) error type Service struct { - handlers map[string]HandlerFunc - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup - preMw []HandlerFunc - postMw []HandlerFunc - keepalive bool - kaTime time.Duration - kaMtx sync.Mutex - listener net.Listener - tcpListener *net.TCPListener + handlers map[string]HandlerFunc + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup + preMw []HandlerFunc + postMw []HandlerFunc + keepalive bool + kaTime time.Duration + kaMtx sync.Mutex + listener net.Listener + tcpListener *net.TCPListener } func NewService() *Service { - rdrpc := &Service{} - rdrpc.handlers = make(map[string]HandlerFunc) - ctx, cancel := context.WithCancel(context.Background()) - rdrpc.ctx = ctx - rdrpc.cancel = cancel - var wg sync.WaitGroup - rdrpc.wg = &wg - rdrpc.preMw = make([]HandlerFunc, 0) - rdrpc.postMw = make([]HandlerFunc, 0) - - return rdrpc + rdrpc := &Service{} + rdrpc.handlers = make(map[string]HandlerFunc) + ctx, cancel := context.WithCancel(context.Background()) + rdrpc.ctx = ctx + rdrpc.cancel = cancel + var wg sync.WaitGroup + rdrpc.wg = &wg + rdrpc.preMw = make([]HandlerFunc, 0) + rdrpc.postMw = make([]HandlerFunc, 0) + + return rdrpc } func (svc *Service) PreMiddleware(mw HandlerFunc) { - svc.preMw = append(svc.preMw, mw) + svc.preMw = append(svc.preMw, mw) } func (svc *Service) PostMiddleware(mw HandlerFunc) { - svc.postMw = append(svc.postMw, mw) + svc.postMw = append(svc.postMw, mw) } func (svc *Service) Handle(method string, handler HandlerFunc) { - svc.handlers[method] = handler + svc.handlers[method] = handler } func (svc *Service) SetKeepAlive(flag bool) { - svc.kaMtx.Lock() - defer svc.kaMtx.Unlock() - svc.keepalive = true + svc.kaMtx.Lock() + defer svc.kaMtx.Unlock() + svc.keepalive = true } func (svc *Service) SetKeepAlivePeriod(interval time.Duration) { - svc.kaMtx.Lock() - defer svc.kaMtx.Unlock() - svc.kaTime = interval + svc.kaMtx.Lock() + defer svc.kaMtx.Unlock() + svc.kaTime = interval } func (svc *Service) Listen(address string) error { - var err error - logInfo("server listen:", address) - - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - err = fmt.Errorf("unable to resolve adddress: %s", err) - return err - } - svc.tcpListener, err = net.ListenTCP("tcp", addr) - if err != nil { - err = fmt.Errorf("unable to start listener: %s", err) - return err - } - - for { - conn, err := svc.tcpListener.AcceptTCP() - if err != nil { - logError("conn accept err:", err) - } - select { - case <-svc.ctx.Done(): - return err - default: - } - svc.wg.Add(1) - go svc.handleTCPConn(conn, svc.wg) - } - return err + var err error + logInfo("server listen:", address) + + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + err = fmt.Errorf("unable to resolve adddress: %s", err) + return err + } + svc.tcpListener, err = net.ListenTCP("tcp", addr) + if err != nil { + err = fmt.Errorf("unable to start listener: %s", err) + return err + } + + for { + conn, err := svc.tcpListener.AcceptTCP() + if err != nil { + logError("conn accept err:", err) + } + select { + case <-svc.ctx.Done(): + return err + default: + } + svc.wg.Add(1) + go svc.handleTCPConn(conn, svc.wg) + } + return err } func (svc *Service) ListenTLS(address string, tlsConfig *tls.Config) error { - var err error - logInfo("server listen:", address) - - svc.listener, err = tls.Listen("tcp", address, tlsConfig) - if err != nil { - err = fmt.Errorf("unable to start listener: %s", err) - return err - } - - for { - conn, err := svc.listener.Accept() - if err != nil { - logError("conn accept err:", err) - } - select { - case <-svc.ctx.Done(): - logInfo("accept loop done") - return err - default: - } - svc.wg.Add(1) - go svc.handleConn(conn, svc.wg) - } - return err + var err error + logInfo("server listen:", address) + + svc.listener, err = tls.Listen("tcp", address, tlsConfig) + if err != nil { + err = fmt.Errorf("unable to start listener: %s", err) + return err + } + + for { + conn, err := svc.listener.Accept() + if err != nil { + logError("conn accept err:", err) + } + select { + case <-svc.ctx.Done(): + logInfo("accept loop done") + return err + default: + } + svc.wg.Add(1) + go svc.handleConn(conn, svc.wg) + } + return err } func notFound(content *Content) error { - execErr := errors.New("method not found") - err := content.SendError(execErr) - return err + execErr := errors.New("method not found") + err := content.SendError(execErr) + return err } func (svc *Service) Stop() error { - var err error - // Disable new connection - logInfo("cancel rpc accept loop") - if svc.listener != nil { - svc.listener.Close() - } - if svc.tcpListener != nil { - svc.tcpListener.Close() - } - svc.cancel() - // Wait handlers - logInfo("wait rpc handlers") - svc.wg.Wait() - return err + var err error + // Disable new connection + logInfo("cancel rpc accept loop") + if svc.listener != nil { + svc.listener.Close() + } + if svc.tcpListener != nil { + svc.tcpListener.Close() + } + svc.cancel() + // Wait handlers + logInfo("wait rpc handlers") + svc.wg.Wait() + return err } func (svc *Service) handleTCPConn(conn *net.TCPConn, wg *sync.WaitGroup) { - var err error - if svc.keepalive { - err = conn.SetKeepAlive(true) - if err != nil { - err = fmt.Errorf("unable to set keepalive: %s", err) - return - } - if svc.kaTime > 0 { - err = conn.SetKeepAlivePeriod(svc.kaTime) - if err != nil { - err = fmt.Errorf("unable to set keepalive period: %s", err) - return - } - } - } - svc.handleConn(conn, wg) + var err error + if svc.keepalive { + err = conn.SetKeepAlive(true) + if err != nil { + err = fmt.Errorf("unable to set keepalive: %s", err) + return + } + if svc.kaTime > 0 { + err = conn.SetKeepAlivePeriod(svc.kaTime) + if err != nil { + err = fmt.Errorf("unable to set keepalive period: %s", err) + return + } + } + } + svc.handleConn(conn, wg) } func (svc *Service) handleConn(conn net.Conn, wg *sync.WaitGroup) { - var err error - - content := CreateContent(conn) - - remoteAddr := conn.RemoteAddr().String() - remoteHost, _, _ := net.SplitHostPort(remoteAddr) - content.remoteHost = remoteHost - - content.binReader = conn - content.binWriter = io.Discard - - exitFunc := func() { - conn.Close() - wg.Done() - if err != nil { - logError("conn handler err:", err) - } - } - defer exitFunc() - - recovFunc := func() { - panicMsg := recover() - if panicMsg != nil { - logError("handler panic message:", panicMsg) - } - } - defer recovFunc() - - err = content.ReadRequest() - if err != nil { - err = err - return - } - - err = content.BindMethod() - if err != nil { - err = err - return - } - for _, mw := range svc.preMw { - err = mw(content) - if err != nil { - err = err - return - } - } - err = svc.Route(content) - if err != nil { - err = err - return - } - for _, mw := range svc.postMw { - err = mw(content) - if err != nil { - err = err - return - } - } - return + var err error + + content := CreateContent(conn) + + remoteAddr := conn.RemoteAddr().String() + remoteHost, _, _ := net.SplitHostPort(remoteAddr) + content.remoteHost = remoteHost + + content.binReader = conn + content.binWriter = io.Discard + + exitFunc := func() { + conn.Close() + wg.Done() + if err != nil { + logError("conn handler err:", err) + } + } + defer exitFunc() + + recovFunc := func() { + panicMsg := recover() + if panicMsg != nil { + logError("handler panic message:", panicMsg) + } + } + defer recovFunc() + + err = content.ReadRequest() + if err != nil { + err = err + return + } + + err = content.BindMethod() + if err != nil { + err = err + return + } + for _, mw := range svc.preMw { + err = mw(content) + if err != nil { + err = err + return + } + } + err = svc.Route(content) + if err != nil { + err = err + return + } + for _, mw := range svc.postMw { + err = mw(content) + if err != nil { + err = err + return + } + } + return } func (svc *Service) Route(content *Content) error { - handler, ok := svc.handlers[content.reqBlock.Method] - if ok { - return handler(content) - } - return notFound(content) + handler, ok := svc.handlers[content.reqBlock.Method] + if ok { + return handler(content) + } + return notFound(content) } func (content *Content) ReadRequest() error { - var err error - - content.reqPacket.header, err = ReadBytes(content.sockReader, headerSize) - if err != nil { - return err - } - content.reqHeader, err = UnpackHeader(content.reqPacket.header) - if err != nil { - return err - } - - rpcSize := content.reqHeader.rpcSize - content.reqPacket.rcpPayload, err = ReadBytes(content.sockReader, rpcSize) - if err != nil { - return err - } - return err + var err error + + content.reqPacket.header, err = ReadBytes(content.sockReader, headerSize) + if err != nil { + return err + } + content.reqHeader, err = UnpackHeader(content.reqPacket.header) + if err != nil { + return err + } + + rpcSize := content.reqHeader.rpcSize + content.reqPacket.rcpPayload, err = ReadBytes(content.sockReader, rpcSize) + if err != nil { + return err + } + return err } func (content *Content) BinWriter() io.Writer { - return content.sockWriter + return content.sockWriter } func (content *Content) BinReader() io.Reader { - return content.sockReader + return content.sockReader } func (content *Content) BinSize() int64 { - return content.reqHeader.binSize + return content.reqHeader.binSize } func (content *Content) ReadBin(ctx context.Context, writer io.Writer) error { - var err error - _, err = CopyBytes(ctx, content.sockReader, writer, content.reqHeader.binSize) - return err + var err error + _, err = CopyBytes(ctx, content.sockReader, writer, content.reqHeader.binSize) + return err } func (content *Content) BindMethod() error { - var err error - err = encoder.Unmarshal(content.reqPacket.rcpPayload, content.reqBlock) - return err + var err error + err = encoder.Unmarshal(content.reqPacket.rcpPayload, content.reqBlock) + return err } func (content *Content) BindParams(params any) error { - var err error - content.reqBlock.Params = params - err = encoder.Unmarshal(content.reqPacket.rcpPayload, content.reqBlock) - if err != nil { - return err - } - return err + var err error + content.reqBlock.Params = params + err = encoder.Unmarshal(content.reqPacket.rcpPayload, content.reqBlock) + if err != nil { + return err + } + return err } func (content *Content) SendResult(result any, binSize int64) error { - var err error - content.resBlock.Result = result - - content.resPacket.rcpPayload, err = content.resBlock.Pack() - if err != nil { - return err - } - content.resHeader.rpcSize = int64(len(content.resPacket.rcpPayload)) - content.resHeader.binSize = binSize - - content.resPacket.header, err = content.resHeader.Pack() - if err != nil { - return err - } - _, err = content.sockWriter.Write(content.resPacket.header) - if err != nil { - return err - } - _, err = content.sockWriter.Write(content.resPacket.rcpPayload) - if err != nil { - return err - } - return err + var err error + content.resBlock.Result = result + + content.resPacket.rcpPayload, err = content.resBlock.Pack() + if err != nil { + return err + } + content.resHeader.rpcSize = int64(len(content.resPacket.rcpPayload)) + content.resHeader.binSize = binSize + + content.resPacket.header, err = content.resHeader.Pack() + if err != nil { + return err + } + _, err = content.sockWriter.Write(content.resPacket.header) + if err != nil { + return err + } + _, err = content.sockWriter.Write(content.resPacket.rcpPayload) + if err != nil { + return err + } + return err } func (content *Content) SendError(execErr error) error { - var err error - - content.resBlock.Error = execErr.Error() - content.resBlock.Result = NewEmptyResult() - - content.resPacket.rcpPayload, err = content.resBlock.Pack() - if err != nil { - return err - } - content.resHeader.rpcSize = int64(len(content.resPacket.rcpPayload)) - content.resPacket.header, err = content.resHeader.Pack() - if err != nil { - return err - } - _, err = content.sockWriter.Write(content.resPacket.header) - if err != nil { - return err - } - _, err = content.sockWriter.Write(content.resPacket.rcpPayload) - if err != nil { - return err - } - return err + var err error + + content.resBlock.Error = execErr.Error() + content.resBlock.Result = NewEmptyResult() + + content.resPacket.rcpPayload, err = content.resBlock.Pack() + if err != nil { + return err + } + content.resHeader.rpcSize = int64(len(content.resPacket.rcpPayload)) + content.resPacket.header, err = content.resHeader.Pack() + if err != nil { + return err + } + _, err = content.sockWriter.Write(content.resPacket.header) + if err != nil { + return err + } + _, err = content.sockWriter.Write(content.resPacket.rcpPayload) + if err != nil { + return err + } + return err }