Skip to content

Commit

Permalink
coap-gateway: update device status online in shortime than exp token
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Feb 22, 2021
1 parent 6fbfb87 commit 52a188e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 10 deletions.
1 change: 1 addition & 0 deletions coap-gateway/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (client *Client) CleanUp() (oldDeviceID *authCtx) {
aCtx := client.loadAuthorizationContext()
log.Debugf("clenaup client %v for device %v", client.coapConn.RemoteAddr(), aCtx.DeviceId)

client.server.devicesStatusUpdater.Remove(client)
client.server.oicPingCache.Delete(client.remoteAddrString())
client.cleanObservedResources()
client.cancelResourceSubscriptions(false)
Expand Down
1 change: 1 addition & 0 deletions coap-gateway/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Config struct {
HeartBeat time.Duration `envconfig:"HEARTBEAT" default:"4s"`
MaxMessageSize int `envconfig:"MAX_MESSAGE_SIZE" default:"262144"`
LogMessages bool `envconfig:"LOG_MESSAGES" default:"false"`
DeviceStatusValidity time.Duration `envconfig:"DEVICE_STATUS_VALIDITY" default:"20m"`
}
114 changes: 114 additions & 0 deletions coap-gateway/service/devicesStatusUpdater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package service

import (
"context"
"fmt"
"sync"
"time"

deviceStatus "github.com/plgd-dev/cloud/coap-gateway/schema/device/status"
pbCQRS "github.com/plgd-dev/cloud/resource-aggregate/pb"
"github.com/plgd-dev/kit/log"
kitNetGrpc "github.com/plgd-dev/kit/net/grpc"
)

type deviceExpires struct {
expires time.Time
client *Client
}

type devicesStatusUpdater struct {
ctx context.Context
deviceStatusValidity time.Duration

mutex sync.Mutex
devices map[string]*deviceExpires
}

func NewDevicesStatusUpdater(ctx context.Context, deviceStatusValidity time.Duration) *devicesStatusUpdater {
u := devicesStatusUpdater{
ctx: ctx,
deviceStatusValidity: deviceStatusValidity,
devices: make(map[string]*deviceExpires),
}
go u.run()
return &u
}

func (u *devicesStatusUpdater) Add(c *Client) error {
expires, err := u.updateOnlineStatus(c, time.Now().Add(u.deviceStatusValidity))
if err != nil {
return err
}
d := deviceExpires{
client: c,
expires: expires,
}
u.mutex.Lock()
defer u.mutex.Unlock()
u.devices[c.remoteAddrString()] = &d
return nil
}

func (u *devicesStatusUpdater) Remove(c *Client) {
u.mutex.Lock()
defer u.mutex.Unlock()
delete(u.devices, c.remoteAddrString())
}

func (u *devicesStatusUpdater) updateOnlineStatus(client *Client, validUntil time.Time) (time.Time, error) {
authCtx := client.loadAuthorizationContext()
if isExpired(authCtx.Expire) {
return time.Time{}, fmt.Errorf("token is expired")
}
serviceToken, err := client.server.oauthMgr.GetToken(client.Context())
if err != nil {
return time.Time{}, fmt.Errorf("cannot get service token: %w", err)
}
ctx := kitNetGrpc.CtxWithUserID(kitNetGrpc.CtxWithToken(client.Context(), serviceToken.AccessToken), authCtx.GetUserID())
if authCtx.Expire.Before(validUntil) {
validUntil = authCtx.Expire
}

return validUntil, deviceStatus.SetOnline(ctx, client.server.raClient, authCtx.GetDeviceId(), validUntil, &pbCQRS.CommandMetadata{
Sequence: client.coapConn.Sequence(),
ConnectionId: client.remoteAddrString(),
}, authCtx.AuthorizationContext)
}

func (u *devicesStatusUpdater) getDevicesToUpdate(now time.Time) []*deviceExpires {
u.mutex.Lock()
defer u.mutex.Unlock()
res := make([]*deviceExpires, 0, len(u.devices))
for key, d := range u.devices {
select {
case <-d.client.Context().Done():
delete(u.devices, key)
default:
if now.Add(u.deviceStatusValidity / 2).After(d.expires) {
res = append(res, d)
}
}
}
return res
}

func (u *devicesStatusUpdater) run() {
t := time.NewTicker(u.deviceStatusValidity / 10)
for {
select {
case <-u.ctx.Done():
return
case now := <-t.C:
for _, d := range u.getDevicesToUpdate(now) {
expires, err := u.updateOnlineStatus(d.client, time.Now().Add(u.deviceStatusValidity))
if err != nil {
log.Errorf("cannot update device(%v) status to online: %v", getDeviceID(d.client), err)
} else {
d.expires = expires
}
}
log.Debugf("update devices statuses to online takes: %v", time.Now().Sub(now))
}
}
}
8 changes: 7 additions & 1 deletion coap-gateway/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type Server struct {
ctx context.Context
cancel context.CancelFunc

sigs chan os.Signal
sigs chan os.Signal
devicesStatusUpdater *devicesStatusUpdater
}

type DialCertManager = interface {
Expand Down Expand Up @@ -101,6 +102,10 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste
}
})

if config.DeviceStatusValidity <= 0 {
log.Fatalf("invalid value of config.DeviceStatusValidity(%v)", config.DeviceStatusValidity)
}

dialTLSConfig := dialCertManager.GetClientTLSConfig()
oauthMgr, err := manager.NewManagerFromConfiguration(config.OAuth, dialTLSConfig)
if err != nil {
Expand Down Expand Up @@ -212,6 +217,7 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste
oicPingCache: oicPingCache,
listener: listener,
authInterceptor: NewAuthInterceptor(),
devicesStatusUpdater: NewDevicesStatusUpdater(ctx, config.DeviceStatusValidity),

sigs: make(chan os.Signal, 1),

Expand Down
12 changes: 3 additions & 9 deletions coap-gateway/service/signIn.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,15 @@ func signInPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client, s
return
}

err = deviceStatus.SetOnline(req.Context, client.server.raClient, signIn.DeviceID, expired, &pbCQRS.CommandMetadata{
Sequence: client.coapConn.Sequence(),
ConnectionId: client.remoteAddrString(),
}, authCtx.AuthorizationContext)
oldAuthCtx := client.replaceAuthorizationContext(&authCtx)
err = client.server.devicesStatusUpdater.Add(client)
if err != nil {
// Events from resources of device will be comes but device is offline. To recover cloud state, client need to reconnect to cloud.
client.logAndWriteErrorResponse(fmt.Errorf("cannot handle sign in: cannot update cloud device status: %w", err), coapCodes.InternalServerError, req.Token)
client.Close()
return
}

oldAuthCtx := client.replaceAuthorizationContext(&authCtx)
newDevice := false

switch {
Expand Down Expand Up @@ -244,10 +241,7 @@ func signOutPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client,
client.Close()
return
}

client.cancelResourceSubscriptions(true)
client.cancelDeviceSubscriptions(true)
oldAuthCtx := client.replaceAuthorizationContext(nil)
oldAuthCtx := client.CleanUp()
if oldAuthCtx.DeviceId != "" {
client.server.expirationClientCache.Set(oldAuthCtx.DeviceId, nil, time.Second)
serviceToken, err := client.server.oauthMgr.GetToken(req.Context)
Expand Down

0 comments on commit 52a188e

Please sign in to comment.