commit d5e5fd37007c62ad1b307c42a26464054521e7df Author: Oleg Borodin Date: Wed Jun 29 14:31:00 2022 +0200 initial import of sources diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f228e54 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.geany +*~ +*.png +*.jpg +*.gif +*.bin diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c9196b7 --- /dev/null +++ b/README.md @@ -0,0 +1,153 @@ +# dsrpc, Data RPC + +DSRPC is easy and simple RPC framework over TCP socket. + +### Purpose + +A very easy and open RPC framework with data streaming. + + +### You can + +- Use post and pre-execution middleware +- Hash-based authentication in middleware +- Test call remote function without service organization + +Socket encryption is not used at this time since framefork +is oriented to transfer large amounts of data + +Style of the framework is similar to that of GIN framework. + +## Example + +### Server + +``` +package main + +import ( + "log" + "github.com/kindsoldier/dsrpc" + "netsrv/api" +) + +func main() { + err := server() + if err != nil { + log.Println(err) + } +} + +func server() error { + var err error + + serv := dsrpc.NewService() + + cont := NewController() + serv.Handler(api.HelloMethod, cont.HelloHandler) + + serv.PreMiddleware(dsrpc.LogRequest) + serv.PostMiddleware(dsrpc.LogResponse) + serv.PostMiddleware(dsrpc.LogAccess) + + err = serv.Listen(":8081") + if err != nil { + return err + } + return err +} + + +type Controller struct { +} + +func NewController() *Controller { + return &Controller{} +} + +func (cont *Controller) HelloHandler(context *dsrpc.Context) error { + var err error + params := api.NewHelloParams() + err = context.BindParams(params) + if err != nil { + return err + } + + log.Println("hello message:", params.Message) + + result := api.NewHelloResult() + result.Message = "hello!" + + err = context.SendResult(result, 0) + if err != nil { + return err + } + + return err +} + +``` + +### Client + +``` +package main + +import ( + "fmt" + "github.com/kindsoldier/dsrpc" + "netsrv/api" +) + +func main() { + + err := exec() + if err != nil { + fmt.Println("exec err:", err) + } +} + +func exec() error { + var err error + + params := api.NewHelloParams() + params.Message = "hello, server!" + + result := api.NewHelloResult() + + err = dsrpc.Exec("127.0.0.1:8081", api.HelloMethod, params, result, nil) + if err != nil { + return err + } + + fmt.Println("result:", result.Message) + return err +} + + +``` + +### Common api + +``` +package api + +const HelloMethod string = "hello" + +type HelloParams struct { + Message string `msgpack:"message" json:"message"` +} + +func NewHelloParams() *HelloParams { + return &HelloParams{} +} + +type HelloResult struct { + Message string `msgpack:"message" json:"message"` +} + +func NewHelloResult() *HelloResult { + return &HelloResult{} +} + +``` diff --git a/client.go b/client.go new file mode 100644 index 0000000..74d0806 --- /dev/null +++ b/client.go @@ -0,0 +1,282 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "encoding/json" + "errors" + "io" + "net" + "sync" +) + + +func Put(address string, method string, reader io.Reader, size int64, param, result any, auth *Auth) error { + var err error + + conn, err := net.Dial("tcp", address) + if err != nil { + return err + } + defer conn.Close() + + return ConnPut(conn, method, reader, size, param, result, auth) +} + + +func ConnPut(conn net.Conn, method string, reader io.Reader, size int64, param, result any, auth *Auth) error { + var err error + context := CreateContext(conn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + context.binReader = reader + context.binWriter = conn + + context.reqHeader.binSize = size + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + + var wg sync.WaitGroup + errChan := make(chan error, 1) + + wg.Add(1) + go context.ReadResponseAsync(&wg, errChan) + + wg.Add(1) + go context.UploadBinAsync(&wg) + + wg.Wait() + err = <- errChan + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + return err +} + +func Get(address string, method string, writer io.Writer, param, result any, auth *Auth) error { + var err error + + conn, err := net.Dial("tcp", address) + if err != nil { + return err + } + defer conn.Close() + + return ConnGet(conn, method, writer, param, result, auth) +} + +func ConnGet(conn net.Conn, method string, writer io.Writer, param, result any, auth *Auth) error { + var err error + + context := CreateContext(conn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + context.binReader = conn + context.binWriter = writer + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + err = context.ReadResponse() + if err != nil { + return err + } + err = context.DownloadBin() + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + return err +} + +func Exec(address, method string, param any, result any, auth *Auth) error { + var err error + + conn, err := net.Dial("tcp", address) + if err != nil { + return err + } + defer conn.Close() + + err = ConnExec(conn, method, param, result, auth) + if err != nil { + return err + } + return err +} + + +func ConnExec(conn net.Conn, method string, param any, result any, auth *Auth) error { + var err error + + context := CreateContext(conn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + err = context.ReadResponse() + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + return err +} + + +func (context *Context) CreateRequest() error { + var err error + + context.reqPacket.rcpPayload, err = context.reqRPC.Pack() + if err != nil { + return err + } + rpcSize := int64(len(context.reqPacket.rcpPayload)) + context.reqHeader.rpcSize = rpcSize + + context.reqPacket.header, err = context.reqHeader.Pack() + if err != nil { + return err + } + return err +} + +func (context *Context) WriteRequest() error { + var err error + _, err = context.sockWriter.Write(context.reqPacket.header) + if err != nil { + return err + } + _, err = context.sockWriter.Write(context.reqPacket.rcpPayload) + if err != nil { + return err + } + return err +} + +func (context *Context) UploadBin() error { + var err error + _, err = CopyBytes(context.binReader, context.binWriter, context.reqHeader.binSize) + return err +} + +func (context *Context) ReadResponse() error { + var err error + + context.resPacket.header, err = ReadBytes(context.sockReader, headerSize) + if err != nil { + return err + } + context.resHeader, err = UnpackHeader(context.resPacket.header) + if err != nil { + return err + } + rpcSize := context.resHeader.rpcSize + context.resPacket.rcpPayload, err = ReadBytes(context.sockReader, rpcSize) + if err != nil { + return err + } + return err +} + +func (context *Context) UploadBinAsync(wg *sync.WaitGroup) { + exitFunc := func() { + wg.Done() + } + defer exitFunc() + _, _ = CopyBytes(context.binReader, context.binWriter, context.reqHeader.binSize) + return +} + +func (context *Context) ReadResponseAsync(wg *sync.WaitGroup, errChan chan error) { + var err error + exitFunc := func() { + errChan <- err + wg.Done() + } + defer exitFunc() + context.resPacket.header, err = ReadBytes(context.sockReader, headerSize) + if err != nil { + return + } + context.resHeader, err = UnpackHeader(context.resPacket.header) + if err != nil { + return + } + rpcSize := context.resHeader.rpcSize + context.resPacket.rcpPayload, err = ReadBytes(context.sockReader, rpcSize) + if err != nil { + return + } + return +} + +func (context *Context) DownloadBin() error { + var err error + _, err = CopyBytes(context.binReader, context.binWriter, context.resHeader.binSize) + return err +} + +func (context *Context) BindResponse() error { + var err error + + err = json.Unmarshal(context.resPacket.rcpPayload, context.resRPC) + if err != nil { + return err + } + if len(context.resRPC.Error) > 0 { + return errors.New(context.resRPC.Error) + } + return err +} diff --git a/compat.go b/compat.go new file mode 100644 index 0000000..506e4b6 --- /dev/null +++ b/compat.go @@ -0,0 +1,3 @@ +package dsrpc + +type any = interface{} diff --git a/context.go b/context.go new file mode 100644 index 0000000..7aeebeb --- /dev/null +++ b/context.go @@ -0,0 +1,90 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "io" + "net" + "time" +) + +type Context struct { + start time.Time + remoteHost string + sockReader io.Reader + sockWriter io.Writer + + reqHeader *Header + reqRPC *Request + reqPacket *Packet + + resPacket *Packet + resHeader *Header + resRPC *Response + + binReader io.Reader + binWriter io.Writer +} + + +func NewContext() *Context { + context := &Context{} + context.start = time.Now() + return context +} + +func CreateContext(conn net.Conn) *Context { + context := &Context{} + context.start = time.Now() + context.sockReader = conn + context.sockWriter = conn + + context.reqPacket = NewPacket() + context.resPacket = NewPacket() + + context.reqHeader = NewHeader() + context.reqRPC = NewRequest() + + context.resHeader = NewHeader() + context.resRPC = NewResponse() + context.resRPC = NewResponse() + + return context +} + +func (context *Context) Request() *Request { + return context.reqRPC +} + + +func (context *Context) SetAuthIdent(ident []byte) { + context.reqRPC.Auth.Ident = ident +} + +func (context *Context) SetAuthSalt(salt []byte) { + context.reqRPC.Auth.Salt = salt +} + +func (context *Context) SetAuthHash(hash []byte) { + context.reqRPC.Auth.Hash = hash +} + +func (context *Context) AuthIdent() []byte { + return context.reqRPC.Auth.Ident +} + +func (context *Context) AuthSalt() []byte { + return context.reqRPC.Auth.Salt +} + +func (context *Context) AuthHash() []byte { + return context.reqRPC.Auth.Hash +} + +func (context *Context) Auth() *Auth { + return context.reqRPC.Auth +} diff --git a/empty.go b/empty.go new file mode 100644 index 0000000..853f1b7 --- /dev/null +++ b/empty.go @@ -0,0 +1,13 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +type Empty struct {} + +func NewEmpty() *Empty { + return &Empty{} +} diff --git a/example/.gitignore b/example/.gitignore new file mode 100644 index 0000000..792c355 --- /dev/null +++ b/example/.gitignore @@ -0,0 +1,6 @@ +*~ +*.bak +*.bin +*.png +*.gif +*.jpg diff --git a/example/api/hello.go b/example/api/hello.go new file mode 100644 index 0000000..5ac11c7 --- /dev/null +++ b/example/api/hello.go @@ -0,0 +1,25 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package api + +const HelloMethod string = "hello" + +type HelloParams struct { + Message string `msgpack:"message" json:"message"` +} + +func NewHelloParams() *HelloParams { + return &HelloParams{} +} + +type HelloResult struct { + Message string `msgpack:"message" json:"message"` +} + +func NewHelloResult() *HelloResult { + return &HelloResult{} +} diff --git a/example/cli/.gitignore b/example/cli/.gitignore new file mode 100644 index 0000000..a6a82e0 --- /dev/null +++ b/example/cli/.gitignore @@ -0,0 +1,7 @@ +*~ +*.bak +*.bin +*.png +*.gif +*.jpg +*cli diff --git a/example/cli/netcli.go b/example/cli/netcli.go new file mode 100644 index 0000000..f55c14f --- /dev/null +++ b/example/cli/netcli.go @@ -0,0 +1,38 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package main + +import ( + "fmt" + "github.com/kindsoldier/dsrpc" + "netsrv/api" +) + +func main() { + + err := exec() + if err != nil { + fmt.Println("exec err:", err) + } +} + +func exec() error { + var err error + + params := api.NewHelloParams() + params.Message = "hello, server!" + + result := api.NewHelloResult() + + err = dsrpc.Exec("127.0.0.1:8081", api.HelloMethod, params, result, nil) + if err != nil { + return err + } + + fmt.Println("result:", result.Message) + return err +} diff --git a/example/go.mod b/example/go.mod new file mode 100644 index 0000000..a07d856 --- /dev/null +++ b/example/go.mod @@ -0,0 +1,7 @@ +module netsrv + +go 1.17 + +require github.com/kindsoldier/dsrpc v0.0.1 + +replace github.com/kindsoldier/dsrpc => ../ diff --git a/example/go.sum b/example/go.sum new file mode 100644 index 0000000..ed3bf3c --- /dev/null +++ b/example/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/srv/.gitignore b/example/srv/.gitignore new file mode 100644 index 0000000..1699114 --- /dev/null +++ b/example/srv/.gitignore @@ -0,0 +1,7 @@ +*~ +*.bak +*.bin +*.png +*.gif +*.jpg +*srv diff --git a/example/srv/netsrv.go b/example/srv/netsrv.go new file mode 100644 index 0000000..fe1d270 --- /dev/null +++ b/example/srv/netsrv.go @@ -0,0 +1,68 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package main + +import ( + "log" + "github.com/kindsoldier/dsrpc" + "netsrv/api" +) + +func main() { + err := server() + if err != nil { + log.Println(err) + } +} + +func server() error { + var err error + + serv := dsrpc.NewService() + + cont := NewController() + serv.Handler(api.HelloMethod, cont.HelloHandler) + + serv.PreMiddleware(dsrpc.LogRequest) + serv.PostMiddleware(dsrpc.LogResponse) + serv.PostMiddleware(dsrpc.LogAccess) + + err = serv.Listen(":8081") + if err != nil { + return err + } + return err +} + + +type Controller struct { +} + +func NewController() *Controller { + return &Controller{} +} + +func (cont *Controller) HelloHandler(context *dsrpc.Context) error { + var err error + params := api.NewHelloParams() + err = context.BindParams(params) + if err != nil { + return err + } + + log.Println("hello message:", params.Message) + + result := api.NewHelloResult() + result.Message = "hello!" + + err = context.SendResult(result, 0) + if err != nil { + return err + } + + return err +} diff --git a/exec_test.go b/exec_test.go new file mode 100644 index 0000000..3736f47 --- /dev/null +++ b/exec_test.go @@ -0,0 +1,376 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLocalExec(t *testing.T) { + var err error + params := NewHelloParams() + params.Message = "hello server!" + result := NewHelloResult() + + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + err = LocalExec(HelloMethod, params, result, auth, helloHandler) + require.NoError(t, err) + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) +} + + +func TestLocalSave(t *testing.T) { + var err error + + params := NewSaveParams() + params.Message = "save data!" + result := NewHelloResult() + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + reader := bytes.NewReader(binBytes) + + err = LocalPut(SaveMethod, reader, binSize, params, result, auth, saveHandler) + require.NoError(t, err) + + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) +} + + +func TestLocalLoad(t *testing.T) { + var err error + + params := NewLoadParams() + params.Message = "load data!" + result := NewHelloResult() + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + binBytes := make([]byte, 0) + writer := bytes.NewBuffer(binBytes) + + err = LocalGet(LoadMethod, writer, params, result, auth, loadHandler) + require.NoError(t, err) + + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) + logDebug("bin size:", len(writer.Bytes())) +} + + +func TestNetExec(t *testing.T) { + go testServ(false) + time.Sleep(10 * time.Millisecond) + err := clientHello() + + require.NoError(t, err) +} + +func TestNetSave(t *testing.T) { + go testServ(false) + time.Sleep(10 * time.Millisecond) + err := clientSave() + require.NoError(t, err) +} + +func TestNetLoad(t *testing.T) { + go testServ(false) + time.Sleep(10 * time.Millisecond) + err := clientLoad() + require.NoError(t, err) +} + +func BenchmarkNetPut(b *testing.B) { + go testServ(true) + time.Sleep(10 * time.Millisecond) + clientSave() + + pBench := func(pb *testing.PB) { + for pb.Next() { + clientSave() + } + } + b.SetParallelism(10) + b.RunParallel(pBench) +} + +func clientHello() error { + var err error + + params := NewHelloParams() + params.Message = "hello server!" + result := NewHelloResult() + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + err = Exec("127.0.0.1:8081", HelloMethod, params, result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) + return err +} + + +func clientSave() error { + var err error + + params := NewSaveParams() + params.Message = "save data!" + result := NewHelloResult() + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + var binSize int64 = 16 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + reader := bytes.NewReader(binBytes) + + err = Put("127.0.0.1:8081", SaveMethod, reader, binSize, params, result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) + return err +} + + +func clientLoad() error { + var err error + + params := NewLoadParams() + params.Message = "load data!" + result := NewHelloResult() + auth := CreateAuth([]byte("qwert"), []byte("12345")) + + + binBytes := make([]byte, 0) + writer := bytes.NewBuffer(binBytes) + + err = Get("127.0.0.1:8081", LoadMethod, writer, params, result, auth) + if err != nil { + logError("method err:", err) + return err + } + resultJSON, _ := json.Marshal(result) + logDebug("method result:", string(resultJSON)) + logDebug("bin size:", len(writer.Bytes())) + return err +} + + +var testServRun bool = false + +func testServ(quiet bool) error { + var err error + + if testServRun { + return err + } + testServRun = true + + if quiet { + SetAccessWriter(io.Discard) + SetMessageWriter(io.Discard) + } + serv := NewService() + serv.Handler(HelloMethod, helloHandler) + serv.Handler(SaveMethod, saveHandler) + serv.Handler(LoadMethod, loadHandler) + + serv.PreMiddleware(LogRequest) + serv.PreMiddleware(auth) + + serv.PostMiddleware(LogResponse) + serv.PostMiddleware(LogAccess) + + err = serv.Listen(":8081") + if err != nil { + return err + } + return err +} + +func auth(context *Context) error { + var err error + reqIdent := context.AuthIdent() + reqSalt := context.AuthSalt() + reqHash := context.AuthHash() + + ident := reqIdent + pass := []byte("12345") + + auth := context.Auth() + logDebug("auth ", string(auth.JSON())) + + ok := CheckHash(ident, pass, reqSalt, reqHash) + logDebug("auth ok:", ok) + if !ok { + err = errors.New("auth ident or pass missmatch") + context.SendError(err) + return err + } + return err +} + +func helloHandler(context *Context) error { + var err error + params := NewHelloParams() + + err = context.BindParams(params) + if err != nil { + return err + } + + err = context.ReadBin(io.Discard) + if err != nil { + context.SendError(err) + return err + } + + result := NewHelloResult() + result.Message = "hello, client!" + + err = context.SendResult(result, 0) + if err != nil { + return err + } + return err +} + +func saveHandler(context *Context) error { + var err error + params := NewSaveParams() + + err = context.BindParams(params) + if err != nil { + return err + } + + bufferBytes := make([]byte, 0, 1024) + binWriter := bytes.NewBuffer(bufferBytes) + + err = context.ReadBin(binWriter) + if err != nil { + context.SendError(err) + return err + } + + result := NewSaveResult() + result.Message = "saved successfully!" + + err = context.SendResult(result, 0) + if err != nil { + return err + } + return err +} + +func loadHandler(context *Context) error { + var err error + params := NewSaveParams() + + err = context.BindParams(params) + if err != nil { + return err + } + + err = context.ReadBin(io.Discard) + if err != nil { + context.SendError(err) + return err + } + + var binSize int64 = 1024 + rand.Seed(time.Now().UnixNano()) + binBytes := make([]byte, binSize) + rand.Read(binBytes) + + binReader := bytes.NewReader(binBytes) + + result := NewSaveResult() + result.Message = "load successfully!" + + err = context.SendResult(result, binSize) + if err != nil { + return err + } + binWriter := context.BinWriter() + _, err = CopyBytes(binReader, binWriter, binSize) + if err != nil { + return err + } + + return err +} + + +const HelloMethod string = "hello" + +type HelloParams struct { + Message string `json:"message" json:"message"` +} + +func NewHelloParams() *HelloParams { + return &HelloParams{} +} + +type HelloResult struct { + Message string `json:"message" json:"message"` +} + +func NewHelloResult() *HelloResult { + return &HelloResult{} +} + + +const SaveMethod string = "save" +type SaveParams HelloParams +type SaveResult HelloResult + +func NewSaveParams() *SaveParams { + return &SaveParams{} +} +func NewSaveResult() *SaveResult { + return &SaveResult{} +} + + + +const LoadMethod string = "load" +type LoadParams HelloParams +type LoadResult HelloResult + +func NewLoadParams() *LoadParams { + return &LoadParams{} +} +func NewLoadResult() *LoadResult { + return &LoadResult{} +} diff --git a/faddr.go b/faddr.go new file mode 100644 index 0000000..e67c248 --- /dev/null +++ b/faddr.go @@ -0,0 +1,27 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +type FAddr struct { + network string + address string +} + +func NewFAddr() *FAddr { + var addr FAddr + addr.network = "tcp" + addr.address = "127.0.0.1:5000" + return &addr +} + +func (addr *FAddr) Network() string { + return addr.network +} + +func (addr *FAddr) String() string { + return addr.address +} diff --git a/faddr_test.go b/faddr_test.go new file mode 100644 index 0000000..75414d8 --- /dev/null +++ b/faddr_test.go @@ -0,0 +1,57 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "net" + "testing" + "github.com/stretchr/testify/require" +) + +func TestFConn0(t *testing.T) { + var cConn, sConn net.Conn + sConn, cConn = NewFConn() + + cData := []byte("qwerty") + count := 10 + + for i := 0; i < count; i++ { + wc, err := cConn.Write(cData) + if err != nil { + t.Error(err) + } + require.Equal(t, wc, len(cData)) + + sData := make([]byte, len(cData)) + rc, err := sConn.Read(sData) + require.NoError(t, err) + require.Equal(t, rc, len(cData)) + require.Equal(t, cData, sData) + } +} + +func TestFConn1(t *testing.T) { + var cConn, sConn net.Conn + cConn, sConn = NewFConn() + + cData := []byte("qwerty") + count := 10 + + for i := 0; i < count; i++ { + wc, err := cConn.Write(cData) + if err != nil { + t.Error(err) + } + require.Equal(t, wc, len(cData)) + + sData := make([]byte, len(cData)) + rc, err := sConn.Read(sData) + require.NoError(t, err) + require.Equal(t, rc, len(cData)) + require.Equal(t, cData, sData) + } +} diff --git a/fconn.go b/fconn.go new file mode 100644 index 0000000..1c042d9 --- /dev/null +++ b/fconn.go @@ -0,0 +1,68 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "bytes" + "io" + "net" + "time" +) + +type FConn struct { + reader io.Reader + writer io.Writer +} + +func NewFConn() (*FConn, *FConn){ + c2sBuffer := bytes.NewBuffer(make([]byte, 0)) + s2cBuffer := bytes.NewBuffer(make([]byte, 0)) + + var client FConn + client.writer = c2sBuffer + client.reader = s2cBuffer + + var server FConn + server.writer = s2cBuffer + server.reader = c2sBuffer + + return &client, &server +} + +func (conn FConn) SetDeadline(t time.Time) error { + var err error + return err +} +func (conn FConn) SetReadDeadline(t time.Time) error { + var err error + return err +} +func (conn FConn) SetWriteDeadline(t time.Time) error { + var err error + return err +} + +func (conn FConn) LocalAddr() net.Addr { + return NewFAddr() +} + +func (conn FConn) RemoteAddr() net.Addr { + return NewFAddr() +} + +func (conn FConn) Write(data []byte) (int, error) { + return conn.writer.Write(data) +} + +func (conn FConn) Read(data []byte) (int, error) { + return conn.reader.Read(data) +} + +func (conn FConn) Close() error { + var err error + return err +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8ab80d4 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/kindsoldier/dsrpc + +go 1.17 + +require github.com/stretchr/testify v1.7.1 + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2dca7c9 --- /dev/null +++ b/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/header.go b/header.go new file mode 100644 index 0000000..1c98286 --- /dev/null +++ b/header.go @@ -0,0 +1,98 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "errors" + "encoding/binary" + "encoding/json" + "bytes" +) + +const headerSize int64 = 16 * 2 +const sizeOfInt64 int = 8 +const magicCodeA int64 = 0xEE00ABBA +const magicCodeB int64 = 0xEE44ABBA + +type Header struct { + magicCodeA int64 `json:"magicCodeA"` + rpcSize int64 `json:"rpcSize"` + binSize int64 `json:"binSize"` + magicCodeB int64 `json:"magicCodeB"` +} + + +func NewHeader() *Header { + return &Header{ + magicCodeA: magicCodeA, + magicCodeB: magicCodeB, + } +} + +func (this *Header) JSON() []byte { + jBytes, _ := json.Marshal(this) + return jBytes +} + + +func (this *Header) Pack() ([]byte, error) { + var err error + headerBytes := make([]byte, 0, headerSize) + headerBuffer := bytes.NewBuffer(headerBytes) + + magicCodeABytes := encoderI64(this.magicCodeA) + headerBuffer.Write(magicCodeABytes) + + rpcSizeBytes := encoderI64(this.rpcSize) + headerBuffer.Write(rpcSizeBytes) + + binSizeBytes := encoderI64(this.binSize) + headerBuffer.Write(binSizeBytes) + + magicCodeBBytes := encoderI64(this.magicCodeB) + headerBuffer.Write(magicCodeBBytes) + + return headerBuffer.Bytes(), err +} + +func UnpackHeader(headerBytes []byte) (*Header, error) { + var err error + header := NewHeader() + headerReader := bytes.NewReader(headerBytes) + + magicCodeABytes := make([]byte, sizeOfInt64) + headerReader.Read(magicCodeABytes) + header.magicCodeA = decoderI64(magicCodeABytes) + + rpcSizeBytes := make([]byte, sizeOfInt64) + headerReader.Read(rpcSizeBytes) + header.rpcSize = decoderI64(rpcSizeBytes) + + binSizeBytes := make([]byte, sizeOfInt64) + headerReader.Read(binSizeBytes) + header.binSize = decoderI64(binSizeBytes) + + magicCodeBBytes := make([]byte, sizeOfInt64) + headerReader.Read(magicCodeBBytes) + header.magicCodeB = decoderI64(magicCodeBBytes) + + if header.magicCodeA != magicCodeA || header.magicCodeB != magicCodeB { + return header, errors.New("wrong protocol magic code") + } + + return header, err +} + +func encoderI64(i int64) []byte { + buffer := make([]byte, sizeOfInt64) + binary.BigEndian.PutUint64(buffer, uint64(i)) + return buffer +} + +func decoderI64(b []byte) int64 { + return int64(binary.BigEndian.Uint64(b)) +} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..ec59bfb --- /dev/null +++ b/logger.go @@ -0,0 +1,45 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "fmt" + "io" + "os" + "time" +) + +var messageWriter io.Writer = os.Stdout +var accessWriter io.Writer = os.Stdout + +func logDebug(messages ...any) { + stamp := time.Now().Format(time.RFC3339Nano) + fmt.Fprintln(messageWriter, stamp, "debug", messages) +} + +func logInfo(messages ...any) { + stamp := time.Now().Format(time.RFC3339Nano) + fmt.Fprintln(messageWriter, stamp, "info", messages) +} + +func logError(messages ...any) { + stamp := time.Now().Format(time.RFC3339Nano) + fmt.Fprintln(messageWriter, stamp, "error", messages) +} + +func logAccess(messages ...any) { + stamp := time.Now().Format(time.RFC3339Nano) + fmt.Fprintln(accessWriter, stamp, "access", messages) +} + +func SetAccessWriter(writer io.Writer) { + accessWriter = writer +} + +func SetMessageWriter(writer io.Writer) { + messageWriter = writer +} diff --git a/midware.go b/midware.go new file mode 100644 index 0000000..fda341a --- /dev/null +++ b/midware.go @@ -0,0 +1,31 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + + +package dsrpc + +import ( + "time" +) + +func LogRequest(context *Context) error { + var err error + logDebug("request:", string(context.reqRPC.JSON())) + return err +} + +func LogResponse(context *Context) error { + var err error + logDebug("response:", string(context.resRPC.JSON())) + return err +} + +func LogAccess(context *Context) error { + var err error + execTime := time.Now().Sub(context.start) + logAccess(context.remoteHost, context.reqRPC.Method, execTime) + return err +} diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..d347037 --- /dev/null +++ b/packet.go @@ -0,0 +1,25 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "encoding/json" +) + +type Packet struct { + header []byte + rcpPayload []byte +} + +func NewPacket() *Packet { + return &Packet{} +} + +func (this *Packet) JSON() []byte { + jBytes, _ := json.Marshal(this) + return jBytes +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..8090608 --- /dev/null +++ b/request.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "encoding/json" +) + +type Request struct { + Method string `json:"method" msgpack:"method"` + Params any `json:"params,omitempty" msgpack:"params,omitempty"` + Auth *Auth `json:"auth,omitempty" msgpack:"auth,omitempty"` +} + +func NewRequest() *Request { + req := &Request{} + req.Auth = &Auth{} + return req +} + +func (this *Request) Pack() ([]byte, error) { + rBytes, err := json.Marshal(this) + return rBytes, err +} + +func (this *Request) JSON() []byte { + jBytes, _ := json.Marshal(this) + return jBytes +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..084c892 --- /dev/null +++ b/response.go @@ -0,0 +1,31 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "encoding/json" +) + + +type Response struct { + Error string `json:"error,omitempty" msgpack:"error,omitempty"` + Result any `json:"result,omitemty" msgpack:"result,omitemty"` +} + +func NewResponse() *Response { + return &Response{} +} + +func (this *Response) JSON() []byte { + jBytes, _ := json.Marshal(this) + return jBytes +} + +func (this *Response) Pack() ([]byte, error) { + rBytes, err := json.Marshal(this) + return rBytes, err +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..4f4add6 --- /dev/null +++ b/server.go @@ -0,0 +1,263 @@ +/* + * + * Copyright 2022 Oleg Borodin + * + */ + +package dsrpc + +import ( + "context" + "encoding/json" + "errors" + "io" + "net" + "sync" +) + +type HandlerFunc = func(*Context) error + +type Service struct { + handlers map[string]HandlerFunc + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup + preMw []HandlerFunc + postMw []HandlerFunc +} + +func NewService() *Service { + rdrpc := &Service{} + rdrpc.handlers = make(map[string]HandlerFunc) + ctx, cancel := context.WithCancel(context.Background()) + rdrpc.ctx = ctx + rdrpc.cancel = cancel + var wg sync.WaitGroup + rdrpc.wg = &wg + rdrpc.preMw = make([]HandlerFunc, 0) + rdrpc.postMw = make([]HandlerFunc, 0) + + return rdrpc +} + +func (this *Service) PreMiddleware(mw HandlerFunc) { + this.preMw = append(this.preMw, mw) +} + +func (this *Service) PostMiddleware(mw HandlerFunc) { + this.postMw = append(this.postMw, mw) +} + + +func (this *Service) Handler(method string, handler HandlerFunc) { + this.handlers[method] = handler +} + +func (this *Service) Listen(address string) error { + var err error + logInfo("server listen:", address) + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + this.wg.Add(1) + for { + select { + case <- this.ctx.Done(): + this.wg.Done() + return err + default: + } + conn, err := listener.Accept() + if err != nil { + logError("conn accept err:", err) + } + + go this.handleConn(conn) + } +} + +func notFound(context *Context) error { + execErr := errors.New("method not found") + err := context.SendError(execErr) + return err +} + +func (this *Service) Stop() error { + var err error + this.cancel() + this.wg.Wait() + return err +} + +func (this *Service) handleConn(conn net.Conn) { + var err error + + context := CreateContext(conn) + + remoteAddr := conn.RemoteAddr().String() + remoteHost, _, _ := net.SplitHostPort(remoteAddr) + context.remoteHost = remoteHost + + context.binReader = conn + context.binWriter = io.Discard + + exitFunc := func() { + conn.Close() + if err != nil { + logError("conn handler err:", err) + } + } + defer exitFunc() + + recovFunc := func () { + panicMsg := recover() + if panicMsg != nil { + logError("handler panic message:", panicMsg) + } + } + defer recovFunc() + + err = context.ReadRequest() + if err != nil { + return + } + + err = context.BindMethod() + if err != nil { + return + } + for _, mw := range this.preMw { + err = mw(context) + if err != nil { + return + } + } + err = this.Route(context) + if err != nil { + return + } + for _, mw := range this.postMw { + err = mw(context) + if err != nil { + return + } + } + return +} + +func (this *Service) Route(context *Context) error { + handler, ok := this.handlers[context.reqRPC.Method] + if ok { + return handler(context) + } + return notFound(context) +} + +func (context *Context) ReadRequest() error { + var err error + + context.reqPacket.header, err = ReadBytes(context.sockReader, headerSize) + if err != nil { + return err + } + context.reqHeader, err = UnpackHeader(context.reqPacket.header) + if err != nil { + return err + } + + rpcSize := context.reqHeader.rpcSize + context.reqPacket.rcpPayload, err = ReadBytes(context.sockReader, rpcSize) + if err != nil { + return err + } + return err +} + +func (context *Context) BinWriter() io.Writer { + return context.sockWriter +} + +func (context *Context) BinReader() io.Reader { + return context.sockReader +} + +func (context *Context) BinSize() int64 { + return context.reqHeader.binSize +} + +func (context *Context) ReadBin(writer io.Writer) error { + var err error + _, err = CopyBytes(context.sockReader, writer, context.reqHeader.binSize) + return err +} + + +func (context *Context) BindMethod() error { + var err error + err = json.Unmarshal(context.reqPacket.rcpPayload, context.reqRPC) + return err +} + +func (context *Context) BindParams(params any) error { + var err error + context.reqRPC.Params = params + err = json.Unmarshal(context.reqPacket.rcpPayload, context.reqRPC) + if err != nil { + return err + } + return err +} + +func (context *Context) SendResult(result any, binSize int64) error { + var err error + context.resRPC.Result = result + + context.resPacket.rcpPayload, err = context.resRPC.Pack() + if err != nil { + return err + } + context.resHeader.rpcSize = int64(len(context.resPacket.rcpPayload)) + context.resHeader.binSize = binSize + + context.resPacket.header, err = context.resHeader.Pack() + if err != nil { + return err + } + _, err = context.sockWriter.Write(context.resPacket.header) + if err != nil { + return err + } + _, err = context.sockWriter.Write(context.resPacket.rcpPayload) + if err != nil { + return err + } + return err +} + + +func (context *Context) SendError(execErr error) error { + var err error + + context.resRPC.Error = execErr.Error() + context.resRPC.Result = NewEmpty() + + context.resPacket.rcpPayload, err = context.resRPC.Pack() + if err != nil { + return err + } + context.resHeader.rpcSize = int64(len(context.resPacket.rcpPayload)) + context.resPacket.header, err = context.resHeader.Pack() + if err != nil { + return err + } + _, err = context.sockWriter.Write(context.resPacket.header) + if err != nil { + return err + } + _, err = context.sockWriter.Write(context.resPacket.rcpPayload) + if err != nil { + return err + } + return err +} diff --git a/tools.go b/tools.go new file mode 100644 index 0000000..554983b --- /dev/null +++ b/tools.go @@ -0,0 +1,54 @@ +/* + * Copyright 2022 Oleg Borodin + */ + +package dsrpc + +import ( + "errors" + "io" + "fmt" +) + +func ReadBytes(reader io.Reader, size int64) ([]byte, error) { + buffer := make([]byte, size) + read, err := io.ReadFull(reader, buffer) + return buffer[0:read], err +} + +func CopyBytes(reader io.Reader, writer io.Writer, dataSize int64) (int64, error) { + var err error + var bSize int64 = 1024 * 4 + var total int64 = 0 + var remains int64 = dataSize + buffer := make([]byte, bSize) + + for { + if reader == nil { + return total, errors.New("reader is nil") + } + if writer == nil { + return total, errors.New("writer is nil") + } + if remains == 0 { + return total, err + } + if remains < bSize { + bSize = remains + } + received, err := reader.Read(buffer[0:bSize]) + if err != nil { + return total, fmt.Errorf("read error: %v", err) + } + recorded, err := writer.Write(buffer[0:received]) + if err != nil { + return total, fmt.Errorf("write error: %v", err) + } + if recorded != received { + return total, errors.New("size mismatch") + } + total += int64(recorded) + remains -= int64(recorded) + } + return total, err +} diff --git a/validate.go b/validate.go new file mode 100644 index 0000000..9584373 --- /dev/null +++ b/validate.go @@ -0,0 +1,164 @@ +/* + * Copyright 2022 Oleg Borodin + */ + +package dsrpc + + +import ( + "io" + "net" +) + +func LocalExec(method string, param any, result any, auth *Auth, handler HandlerFunc) error { + var err error + + cliConn, srvConn := NewFConn() + + context := CreateContext(cliConn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + err = LocalService(srvConn, handler) + if err != nil { + return err + } + err = context.ReadResponse() + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + + return err +} + +func LocalPut(method string, reader io.Reader, size int64, param, result any, auth *Auth, handler HandlerFunc) error { + + var err error + + cliConn, srvConn := NewFConn() + + context := CreateContext(cliConn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + context.binReader = reader + context.binWriter = cliConn + + context.reqHeader.binSize = size + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + err = context.UploadBin() + if err != nil { + return err + } + err = LocalService(srvConn, handler) + if err != nil { + return err + } + err = context.ReadResponse() + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + return err +} + + +func LocalGet(method string, writer io.Writer, param, result any, auth *Auth, handler HandlerFunc) error { + var err error + + cliConn, srvConn := NewFConn() + + context := CreateContext(cliConn) + context.reqRPC.Method = method + context.reqRPC.Params = param + context.reqRPC.Auth = auth + context.resRPC.Result = result + + context.binReader = cliConn + context.binWriter = writer + + if context.reqRPC.Params == nil { + context.reqRPC.Params = NewEmpty() + } + err = context.CreateRequest() + if err != nil { + return err + } + err = context.WriteRequest() + if err != nil { + return err + } + + err = LocalService(srvConn, handler) + if err != nil { + return err + } + err = context.ReadResponse() + if err != nil { + return err + } + err = context.DownloadBin() + if err != nil { + return err + } + err = context.BindResponse() + if err != nil { + return err + } + return err +} + +func LocalService(conn net.Conn, handler HandlerFunc) error { + var err error + context := CreateContext(conn) + + remoteAddr := conn.RemoteAddr().String() + remoteHost, _, _ := net.SplitHostPort(remoteAddr) + context.remoteHost = remoteHost + + context.binReader = conn + context.binWriter = io.Discard + + err = context.ReadRequest() + if err != nil { + return err + } + err = context.BindMethod() + if err != nil { + return err + } + return handler(context) +} diff --git a/xauth.go b/xauth.go new file mode 100644 index 0000000..28f399e --- /dev/null +++ b/xauth.go @@ -0,0 +1,64 @@ +/* + * Copyright 2022 Oleg Borodin + */ + +package dsrpc + +import ( + "encoding/json" + "bytes" + "math/rand" + "time" + "crypto/sha256" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +type Auth struct { + Ident []byte `json:"ident,omitempty"` + Salt []byte `json:"salt,omitempty"` + Hash []byte `json:"hash,omitempty"` +} + +func NewAuth() *Auth { + return &Auth{} +} + +func (this *Auth) JSON() []byte { + jBytes, _ := json.Marshal(this) + return jBytes +} + +func CreateAuth(ident, pass []byte) *Auth { + salt := CreateSalt() + hash := CreateHash(ident, pass, salt) + auth := &Auth{} + auth.Ident = ident + auth.Salt = salt + auth.Hash = hash + return auth +} + +func CreateSalt() []byte { + const saltSize = 16 + randBytes := make([]byte, saltSize) + rand.Read(randBytes) + return randBytes +} + +func CreateHash(ident, pass, salt []byte) []byte { + vec := make([]byte, 0, len(ident) + len(salt) + len(pass)) + vec = append(vec, ident...) + vec = append(vec, salt...) + vec = append(vec, pass...) + hasher := sha256.New() + hash := hasher.Sum(vec) + return hash +} + +func CheckHash(ident, pass, reqSalt, reqHash []byte) bool { + localHash := CreateHash(ident, pass, reqSalt) + return bytes.Equal(reqHash, localHash) +}