From 53bff9f21639df9fbd68a7e6118b456e70161841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=ADghearn=C3=A1n=20Carroll?= Date: Wed, 23 Feb 2022 15:19:30 +0000 Subject: [PATCH] Fix: Re-establish socket connections on startup (#123) * reestablish socket connection on start up * remove 32 bit --- .gitignore | 2 ++ .goreleaser.yml | 1 - cmd/internal/internal.go | 35 +++++++++++++++++++ cmd/server/main.go | 3 ++ data/sockets/connect.go | 4 ++- data/sockets/pay_async.go | 2 +- go.mod | 2 +- go.sum | 8 ++--- service/health.go | 4 ++- .../theflyingcodr/sockets/client/client.go | 20 ++++++++--- .../sockets/client/connections.go | 5 ++- .../theflyingcodr/sockets/errors.go | 6 ++++ .../theflyingcodr/sockets/server/server.go | 29 +++++++++++++++ .../theflyingcodr/sockets/socket.go | 9 +++-- vendor/modules.txt | 2 +- 15 files changed, 113 insertions(+), 19 deletions(-) create mode 100644 vendor/github.com/theflyingcodr/sockets/errors.go diff --git a/.gitignore b/.gitignore index 8b82598a..fc27936a 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ cover.html *.swp run .vscode + +bin/ diff --git a/.goreleaser.yml b/.goreleaser.yml index 3aadf9d2..3288932b 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -20,7 +20,6 @@ archives: darwin: Darwin linux: Linux windows: Windows - 386: i386 amd64: x86_64 checksum: name_template: 'checksums.txt' diff --git a/cmd/internal/internal.go b/cmd/internal/internal.go index da8e203a..6c4647c6 100644 --- a/cmd/internal/internal.go +++ b/cmd/internal/internal.go @@ -3,10 +3,13 @@ package internal import ( "context" "fmt" + "net/url" + "time" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" + "github.com/libsv/payd" "github.com/libsv/payd/config" paydSQL "github.com/libsv/payd/data/sqlite" "github.com/libsv/payd/docs" @@ -16,6 +19,7 @@ import ( paydMiddleware "github.com/libsv/payd/transports/http/middleware" tsoc "github.com/libsv/payd/transports/sockets" socMiddleware "github.com/libsv/payd/transports/sockets/middleware" + "github.com/pkg/errors" "github.com/spf13/viper" echoSwagger "github.com/swaggo/echo-swagger" "github.com/theflyingcodr/sockets/client" @@ -125,6 +129,37 @@ func ResumeActiveChannels(deps *SocketDeps) error { return nil } +// ResumeSocketConnections resume socket connections with the P4 host. +func ResumeSocketConnections(deps *SocketDeps, cfg *config.P4) error { + u, err := url.Parse(cfg.ServerHost) + if err != nil { + return errors.Wrap(err, "failed to parse p4 host") + } + + // No need to re-establish socket conn when running over http + if u.Scheme != "ws" && u.Scheme != "wss" { + return nil + } + + ctx := context.Background() + invoices, err := deps.InvoiceService.Invoices(ctx) + if err != nil { + return errors.Wrap(err, "failed to retrieve invoices") + } + + for _, invoice := range invoices { + if time.Now().UTC().Unix() <= invoice.ExpiresAt.Time.UTC().Unix() && invoice.State == payd.StateInvoicePending { + if err := deps.ConnectService.Connect(ctx, payd.ConnectArgs{ + InvoiceID: invoice.ID, + }); err != nil { + return errors.Wrapf(err, "failed to connect invoice %s", invoice.ID) + } + } + } + + return nil +} + func wsHandler(svr *server.SocketServer) echo.HandlerFunc { upgrader := websocket.Upgrader{} return func(c echo.Context) error { diff --git a/cmd/server/main.go b/cmd/server/main.go index c22b5143..f3628fa2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -101,6 +101,9 @@ func main() { if err := internal.ResumeActiveChannels(deps); err != nil { log.Fatal(err, "failed to resume active peer channels") } + if err := internal.ResumeSocketConnections(deps, cfg.P4); err != nil { + log.Error(err, "failed to reconnect invoices with p4") + } if cfg.Deployment.IsDev() { internal.PrintDev(e) diff --git a/data/sockets/connect.go b/data/sockets/connect.go index 8fd709be..8c56d165 100644 --- a/data/sockets/connect.go +++ b/data/sockets/connect.go @@ -22,7 +22,9 @@ func NewConnect(cfg *config.P4, cli *client.Client) *connect { // Connect will join payd with a socket server and kick off the payment process. func (c *connect) Connect(ctx context.Context, args payd.ConnectArgs) error { - if err := c.cli.JoinChannel(c.cfg.ServerHost, args.InvoiceID, nil); err != nil { + if err := c.cli.JoinChannel(c.cfg.ServerHost, args.InvoiceID, nil, map[string]string{ + "internal": "true", + }); err != nil { return errors.Wrapf(err, "failed to connect to channel") } return nil diff --git a/data/sockets/pay_async.go b/data/sockets/pay_async.go index 35c9a99e..f834e383 100644 --- a/data/sockets/pay_async.go +++ b/data/sockets/pay_async.go @@ -36,7 +36,7 @@ func (c *paymentChannel) Pay(ctx context.Context, req payd.PayRequest) error { // parse url to get host connection and invoiceID parts := reURL.FindStringSubmatch(req.PayToURL) invoiceID := parts[2] - if err := c.cli.JoinChannel(parts[1], invoiceID, nil); err != nil { + if err := c.cli.JoinChannel(parts[1], invoiceID, nil, nil); err != nil { return errors.Wrapf(err, "failed to connect to channel %s", invoiceID) } // kick off the process - we will receive the messages via the socket transport listeners. diff --git a/go.mod b/go.mod index cc743ce5..dfb2036d 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/libsv/go-bc v0.1.8 github.com/rs/zerolog v1.26.1 - github.com/theflyingcodr/sockets v0.0.11-beta + github.com/theflyingcodr/sockets v0.0.11-beta.0.20220222160101-76100ef886b5 ) require ( diff --git a/go.sum b/go.sum index 32703102..02b8915f 100644 --- a/go.sum +++ b/go.sum @@ -484,7 +484,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq19sBYvuMoyQ4= github.com/labstack/echo/v4 v4.1.14/go.mod h1:Q5KZ1vD3V5FEzjM79hjwVrC3ABr7F5IdM23bXQMRDGg= -github.com/labstack/echo/v4 v4.6.1/go.mod h1:RnjgMWNDB9g/HucVWhQYNQP9PvbYf6adqftqryo7s9k= github.com/labstack/echo/v4 v4.6.3 h1:VhPuIZYxsbPmo4m9KAkMU/el2442eB7EBFFhNTTT9ac= github.com/labstack/echo/v4 v4.6.3/go.mod h1:Hk5OiHj0kDqmFq7aHe7eDqI7CUhuCrfpupQtLGGLm7A= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -528,7 +527,6 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= @@ -642,7 +640,6 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= -github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -710,8 +707,8 @@ github.com/theflyingcodr/lathos v0.0.6 h1:xIHMZTinurvodmFOgvSGD+OrDhSj42+Xz+FOXY github.com/theflyingcodr/lathos v0.0.6/go.mod h1:68tGFEbAqAzydWDb1KEJZPQY57l3hH32GXO11Hf1zGQ= github.com/theflyingcodr/migrate/v4 v4.15.1-0.20210927160112-79da889ca18e h1:gfOQ8DVRKwc97bXeR6I6ogosWhFi4mredpUtZcx/gvg= github.com/theflyingcodr/migrate/v4 v4.15.1-0.20210927160112-79da889ca18e/go.mod h1:g9qbiDvB47WyrRnNu2t2gMZFNHKnatsYRxsGZbCi4EM= -github.com/theflyingcodr/sockets v0.0.11-beta h1:73rvasQ8aQkuzX1usPjAu9YyE73Kn6ffMfCucRGu1Pw= -github.com/theflyingcodr/sockets v0.0.11-beta/go.mod h1:9WuWIyja/Q8PF3WmblAjFmfzkKabEBLto2Gx1XYnerc= +github.com/theflyingcodr/sockets v0.0.11-beta.0.20220222160101-76100ef886b5 h1:39/z+O7p2ND6GgvOHZAr7o1gjm6UGls8FCcki8hbXRE= +github.com/theflyingcodr/sockets v0.0.11-beta.0.20220222160101-76100ef886b5/go.mod h1:u4PMKd3yqHkt9Jn0VgQRZ33PG9ynL8/j53csVO1huyk= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tonicpow/go-minercraft v0.4.0 h1:o7ndFm0NDIWZ6ml5qXGzGIh7xhGDWQhQixjPCJec2PY= github.com/tonicpow/go-minercraft v0.4.0/go.mod h1:+mJZAtlRy89vbL/gLAH4kft46lxueHUGMhsBJF2E9Fg= @@ -997,7 +994,6 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/service/health.go b/service/health.go index 9c65a2ff..cc0fa181 100644 --- a/service/health.go +++ b/service/health.go @@ -31,7 +31,9 @@ func (h *healthSvc) Health(ctx context.Context) error { } switch u.Scheme { case "ws", "wss": - if err := h.c.JoinChannel(h.cfg.ServerHost, "health", nil); err != nil { + if err := h.c.JoinChannel(h.cfg.ServerHost, "health", nil, map[string]string{ + "internal": "true", + }); err != nil { return err } if err := h.c.Publish(sockets.Request{ diff --git a/vendor/github.com/theflyingcodr/sockets/client/client.go b/vendor/github.com/theflyingcodr/sockets/client/client.go index 53e31b79..3fd9e76e 100644 --- a/vendor/github.com/theflyingcodr/sockets/client/client.go +++ b/vendor/github.com/theflyingcodr/sockets/client/client.go @@ -1,13 +1,14 @@ package client import ( - "errors" "fmt" "net/http" + "net/url" "sync" "time" "github.com/gorilla/websocket" + "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/theflyingcodr/sockets" @@ -244,9 +245,20 @@ func (c *Client) Close() { // it cannot connect. // // If you need to authenticate with the server or send meta, add header/s. -func (c *Client) JoinChannel(host, channelID string, headers http.Header) error { +func (c *Client) JoinChannel(host, channelID string, headers http.Header, params map[string]string) error { log.Info().Msgf("joining channel %s", channelID) - ws, resp, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s/%s", host, channelID), headers) + u, err := url.Parse(fmt.Sprintf("%s/%s", host, channelID)) + if err != nil { + return errors.Wrap(err, "failed to parse channel url") + } + if params != nil { + q := u.Query() + for k, v := range params { + q.Add(k, v) + } + u.RawQuery = q.Encode() + } + ws, resp, err := websocket.DefaultDialer.Dial(u.String(), headers) if err != nil { return err } @@ -254,7 +266,7 @@ func (c *Client) JoinChannel(host, channelID string, headers http.Header) error _ = resp.Body.Close() }() c.channelJoin <- &connection{ - url: fmt.Sprintf("%s/%s", host, channelID), + url: u.String(), channelID: channelID, ws: ws, closer: make(chan bool), diff --git a/vendor/github.com/theflyingcodr/sockets/client/connections.go b/vendor/github.com/theflyingcodr/sockets/client/connections.go index 8bc3f889..686343db 100644 --- a/vendor/github.com/theflyingcodr/sockets/client/connections.go +++ b/vendor/github.com/theflyingcodr/sockets/client/connections.go @@ -120,7 +120,7 @@ func (c *Client) reconnect(url string) (*websocket.Conn, bool) { for { i++ time.Sleep(c.opts.reconnectTimeout) - ws, _, err := websocket.DefaultDialer.Dial(url, nil) + ws, resp, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { log.Err(err).Msgf("failed to reconnect to '%s' after '%d' attempts", url, i) if c.opts.reconnectAttempts != -1 && i > c.opts.reconnectAttempts { @@ -128,6 +128,9 @@ func (c *Client) reconnect(url string) (*websocket.Conn, bool) { } continue } + defer func() { + _ = resp.Body.Close() + }() return ws, true } } diff --git a/vendor/github.com/theflyingcodr/sockets/errors.go b/vendor/github.com/theflyingcodr/sockets/errors.go new file mode 100644 index 00000000..fb4ad1c0 --- /dev/null +++ b/vendor/github.com/theflyingcodr/sockets/errors.go @@ -0,0 +1,6 @@ +package sockets + +import "errors" + +// ErrChannelNotFound returned when channel is not found. +var ErrChannelNotFound = errors.New("channel not found") diff --git a/vendor/github.com/theflyingcodr/sockets/server/server.go b/vendor/github.com/theflyingcodr/sockets/server/server.go index eb0bbe50..49c145f0 100644 --- a/vendor/github.com/theflyingcodr/sockets/server/server.go +++ b/vendor/github.com/theflyingcodr/sockets/server/server.go @@ -112,6 +112,7 @@ type SocketServer struct { register chan register channelSender chan sender directSender chan sender + channelChecker chan checker close chan struct{} done chan struct{} channelCloser chan string @@ -142,6 +143,7 @@ func New(opts ...OptFunc) *SocketServer { register: make(chan register, 1), channelSender: make(chan sender, 256), directSender: make(chan sender, 256), + channelChecker: make(chan checker, 256), close: make(chan struct{}, 1), done: make(chan struct{}, 1), opts: defaults, @@ -258,6 +260,9 @@ func (s *SocketServer) channelManager() { } go func() { ch.send <- m.msg }() } + case c := <-s.channelChecker: + _, ok := s.channels[c.ID] + c.exists <- ok case <-ticker.C: for channelID, channel := range s.channels { if channel.expires.IsZero() { // doesn't expire @@ -480,12 +485,31 @@ func (s *SocketServer) BroadcastDirect(clientID string, msg *sockets.Message) { } } +// HasChannel will check to see if the server has a channel connection established. +func (s *SocketServer) HasChannel(channelID string) bool { + log.Debug().Msgf("checking if channel %s exists", channelID) + exists := make(chan bool) + defer close(exists) + + s.channelChecker <- checker{ + ID: channelID, + exists: exists, + } + + result := <-exists + log.Debug().Msgf("channel %s exists: %t", channelID, result) + return result +} + // BroadcastAwait will send a broadcast to a channel and wait for a response, // this will simply act on the first response to hit the server, if multiple peers respond, only the // first will be returned. // // The function will return if a msg is returned OR an error is returned OR the ctx times out. func (s *SocketServer) BroadcastAwait(ctx context.Context, channelID string, msg *sockets.Message) (*sockets.Message, error) { + if !s.HasChannel(channelID) { + return nil, sockets.ErrChannelNotFound + } defer s.waitingMsgs.delete(msg.CorrelationID) wm := newWaitMessage() s.waitingMsgs.add(msg.CorrelationID, wm) @@ -521,3 +545,8 @@ type sender struct { ID string msg interface{} } + +type checker struct { + ID string + exists chan bool +} diff --git a/vendor/github.com/theflyingcodr/sockets/socket.go b/vendor/github.com/theflyingcodr/sockets/socket.go index 5e9092b9..d9cb76d6 100644 --- a/vendor/github.com/theflyingcodr/sockets/socket.go +++ b/vendor/github.com/theflyingcodr/sockets/socket.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "github.com/rs/zerolog/log" ) // common headers. @@ -18,7 +19,7 @@ const ( // Client can be used to implement a client which will send and listen // to messages on channels. type Client interface { - JoinChannel(host, channelID string, headers http.Header) error + JoinChannel(host, channelID string, headers http.Header, params map[string]string) error LeaveChannel(channelID string, headers http.Header) RegisterListener(msgType string, fn HandlerFunc) } @@ -221,8 +222,12 @@ type ErrorMessage struct { // There is a default sockets.ErrorDetail struct available, or you can define your own. func (m *Message) ToError(err interface{}) *ErrorMessage { var bb []byte + var mErr error if !isNil(err) { - bb, _ = json.Marshal(err) + bb, mErr = json.Marshal(err) + if mErr != nil { + log.Error().Err(mErr) + } } e := &ErrorMessage{ CorrelationID: m.CorrelationID, diff --git a/vendor/modules.txt b/vendor/modules.txt index ec5f6b09..ee14ea59 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -245,7 +245,7 @@ github.com/theflyingcodr/govalidator ## explicit; go 1.17 github.com/theflyingcodr/lathos github.com/theflyingcodr/lathos/errs -# github.com/theflyingcodr/sockets v0.0.11-beta +# github.com/theflyingcodr/sockets v0.0.11-beta.0.20220222160101-76100ef886b5 ## explicit; go 1.17 github.com/theflyingcodr/sockets github.com/theflyingcodr/sockets/client