diff --git a/coap-gateway/service/client.go b/coap-gateway/service/client.go index 5538d747f..3df311a05 100644 --- a/coap-gateway/service/client.go +++ b/coap-gateway/service/client.go @@ -344,6 +344,7 @@ func (client *Client) CleanUp() (oldDeviceID *authCtx) { aCtx := client.loadAuthorizationContext() log.Debugf("clenaup client %v for device %v", client.coapConn.RemoteAddr(), aCtx.DeviceId) + client.server.devicesStatusUpdater.Remove(client) client.server.oicPingCache.Delete(client.remoteAddrString()) client.cleanObservedResources() client.cancelResourceSubscriptions(false) diff --git a/coap-gateway/service/config.go b/coap-gateway/service/config.go index 5615c7f08..dffbd3785 100644 --- a/coap-gateway/service/config.go +++ b/coap-gateway/service/config.go @@ -27,4 +27,5 @@ type Config struct { HeartBeat time.Duration `envconfig:"HEARTBEAT" default:"4s"` MaxMessageSize int `envconfig:"MAX_MESSAGE_SIZE" default:"262144"` LogMessages bool `envconfig:"LOG_MESSAGES" default:"false"` + DeviceStatusValidity time.Duration `envconfig:"DEVICE_STATUS_VALIDITY" default:"20m"` } diff --git a/coap-gateway/service/devicesStatusUpdater.go b/coap-gateway/service/devicesStatusUpdater.go new file mode 100644 index 000000000..6816b6139 --- /dev/null +++ b/coap-gateway/service/devicesStatusUpdater.go @@ -0,0 +1,114 @@ +package service + +import ( + "context" + "fmt" + "sync" + "time" + + deviceStatus "github.com/plgd-dev/cloud/coap-gateway/schema/device/status" + pbCQRS "github.com/plgd-dev/cloud/resource-aggregate/pb" + "github.com/plgd-dev/kit/log" + kitNetGrpc "github.com/plgd-dev/kit/net/grpc" +) + +type deviceExpires struct { + expires time.Time + client *Client +} + +type devicesStatusUpdater struct { + ctx context.Context + deviceStatusValidity time.Duration + + mutex sync.Mutex + devices map[string]*deviceExpires +} + +func NewDevicesStatusUpdater(ctx context.Context, deviceStatusValidity time.Duration) *devicesStatusUpdater { + u := devicesStatusUpdater{ + ctx: ctx, + deviceStatusValidity: deviceStatusValidity, + devices: make(map[string]*deviceExpires), + } + go u.run() + return &u +} + +func (u *devicesStatusUpdater) Add(c *Client) error { + expires, err := u.updateOnlineStatus(c, time.Now().Add(u.deviceStatusValidity)) + if err != nil { + return err + } + d := deviceExpires{ + client: c, + expires: expires, + } + u.mutex.Lock() + defer u.mutex.Unlock() + u.devices[c.remoteAddrString()] = &d + return nil +} + +func (u *devicesStatusUpdater) Remove(c *Client) { + u.mutex.Lock() + defer u.mutex.Unlock() + delete(u.devices, c.remoteAddrString()) +} + +func (u *devicesStatusUpdater) updateOnlineStatus(client *Client, validUntil time.Time) (time.Time, error) { + authCtx := client.loadAuthorizationContext() + if isExpired(authCtx.Expire) { + return time.Time{}, fmt.Errorf("token is expired") + } + serviceToken, err := client.server.oauthMgr.GetToken(client.Context()) + if err != nil { + return time.Time{}, fmt.Errorf("cannot get service token: %w", err) + } + ctx := kitNetGrpc.CtxWithUserID(kitNetGrpc.CtxWithToken(client.Context(), serviceToken.AccessToken), authCtx.GetUserID()) + if authCtx.Expire.Before(validUntil) { + validUntil = authCtx.Expire + } + + return validUntil, deviceStatus.SetOnline(ctx, client.server.raClient, authCtx.GetDeviceId(), validUntil, &pbCQRS.CommandMetadata{ + Sequence: client.coapConn.Sequence(), + ConnectionId: client.remoteAddrString(), + }, authCtx.AuthorizationContext) +} + +func (u *devicesStatusUpdater) getDevicesToUpdate(now time.Time) []*deviceExpires { + u.mutex.Lock() + defer u.mutex.Unlock() + res := make([]*deviceExpires, 0, len(u.devices)) + for key, d := range u.devices { + select { + case <-d.client.Context().Done(): + delete(u.devices, key) + default: + if now.Add(u.deviceStatusValidity / 2).After(d.expires) { + res = append(res, d) + } + } + } + return res +} + +func (u *devicesStatusUpdater) run() { + t := time.NewTicker(u.deviceStatusValidity / 10) + for { + select { + case <-u.ctx.Done(): + return + case now := <-t.C: + for _, d := range u.getDevicesToUpdate(now) { + expires, err := u.updateOnlineStatus(d.client, time.Now().Add(u.deviceStatusValidity)) + if err != nil { + log.Errorf("cannot update device(%v) status to online: %v", getDeviceID(d.client), err) + } else { + d.expires = expires + } + } + log.Debugf("update devices statuses to online takes: %v", time.Now().Sub(now)) + } + } +} diff --git a/coap-gateway/service/server.go b/coap-gateway/service/server.go index 2ab636303..781042ec5 100644 --- a/coap-gateway/service/server.go +++ b/coap-gateway/service/server.go @@ -72,7 +72,8 @@ type Server struct { ctx context.Context cancel context.CancelFunc - sigs chan os.Signal + sigs chan os.Signal + devicesStatusUpdater *devicesStatusUpdater } type DialCertManager = interface { @@ -101,6 +102,10 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste } }) + if config.DeviceStatusValidity <= 0 { + log.Fatalf("invalid value of config.DeviceStatusValidity(%v)", config.DeviceStatusValidity) + } + dialTLSConfig := dialCertManager.GetClientTLSConfig() oauthMgr, err := manager.NewManagerFromConfiguration(config.OAuth, dialTLSConfig) if err != nil { @@ -212,6 +217,7 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste oicPingCache: oicPingCache, listener: listener, authInterceptor: NewAuthInterceptor(), + devicesStatusUpdater: NewDevicesStatusUpdater(ctx, config.DeviceStatusValidity), sigs: make(chan os.Signal, 1), diff --git a/coap-gateway/service/signIn.go b/coap-gateway/service/signIn.go index 7e49725f6..ad8e35d06 100644 --- a/coap-gateway/service/signIn.go +++ b/coap-gateway/service/signIn.go @@ -121,10 +121,8 @@ func signInPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client, s return } - err = deviceStatus.SetOnline(req.Context, client.server.raClient, signIn.DeviceID, expired, &pbCQRS.CommandMetadata{ - Sequence: client.coapConn.Sequence(), - ConnectionId: client.remoteAddrString(), - }, authCtx.AuthorizationContext) + oldAuthCtx := client.replaceAuthorizationContext(&authCtx) + err = client.server.devicesStatusUpdater.Add(client) if err != nil { // Events from resources of device will be comes but device is offline. To recover cloud state, client need to reconnect to cloud. client.logAndWriteErrorResponse(fmt.Errorf("cannot handle sign in: cannot update cloud device status: %w", err), coapCodes.InternalServerError, req.Token) @@ -132,7 +130,6 @@ func signInPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client, s return } - oldAuthCtx := client.replaceAuthorizationContext(&authCtx) newDevice := false switch { @@ -244,10 +241,7 @@ func signOutPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client, client.Close() return } - - client.cancelResourceSubscriptions(true) - client.cancelDeviceSubscriptions(true) - oldAuthCtx := client.replaceAuthorizationContext(nil) + oldAuthCtx := client.CleanUp() if oldAuthCtx.DeviceId != "" { client.server.expirationClientCache.Set(oldAuthCtx.DeviceId, nil, time.Second) serviceToken, err := client.server.oauthMgr.GetToken(req.Context)