Skip to content

Commit

Permalink
refactor: use same MQTT interface
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
  • Loading branch information
rodneyosodo committed Dec 16, 2024
1 parent cb37230 commit 05c78fe
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 360 deletions.
14 changes: 7 additions & 7 deletions cmd/manager/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ const svcName = "manager"

type Config struct {
LogLevel string
OTELURL url.URL
TraceRatio float64
Server server.Config
InstanceID string
MQTTAddress string
MQTTQoS uint8
MQTTTimeout time.Duration
ChannelID string
ThingID string
ThingKey string
MQTTAddress string
MQTTQOS uint8
MQTTTimeout time.Duration
Server server.Config
OTELURL url.URL
TraceRatio float64
}

func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) error {
Expand Down Expand Up @@ -70,7 +70,7 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er
}
tracer := tp.Tracer(svcName)

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, logger)
mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, svcName, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error())
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/proplet/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"
"time"

"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/proplet"
)

Expand Down Expand Up @@ -41,13 +42,13 @@ func StartProplet(ctx context.Context, cancel context.CancelFunc, cfg proplet.Co
logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL))
}

mqttClient, err := proplet.NewMQTTClient(ctx, cfg, logger)
mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
return errors.Join(errors.New("failed to initialize mqtt client"), err)
}
wazero := proplet.NewWazeroRuntime(logger, mqttClient, cfg.ChannelID)
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)

service, err := proplet.NewService(cfg, mqttClient, logger, wazero)
service, err := proplet.NewService(ctx, cfg, mqttPubSub, logger, wazero)
if err != nil {
return errors.Join(errors.New("failed to initialize service"), err)
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/absmach/magistrala v0.15.1
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/chi/v5 v5.2.0
github.com/go-kit/kit v0.13.0
github.com/google/uuid v1.6.0
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f
Expand Down Expand Up @@ -49,8 +49,8 @@ require (
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
google.golang.org/protobuf v1.36.0 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU=
github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
Expand Down Expand Up @@ -120,14 +122,20 @@ golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:CkkIfIt50+lT6NHAVoRYEyAvQGFM7xEwXUUywFvEb3Q=
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0=
google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw=
google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI=
google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45 changes: 38 additions & 7 deletions pkg/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ var (
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
errEmptyTopic = errors.New("empty topic")
errEmptyID = errors.New("empty ID")

aliveTopicTemplate = "channels/%s/messages/control/proplet/alive"
lwtPayloadTemplate = `{"status":"offline","proplet_id":"%s","mg_channel_id":"%s"}`
)

type pubsub struct {
Expand All @@ -35,12 +38,12 @@ type PubSub interface {
Unsubscribe(ctx context.Context, topic string) error
}

func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) {
func NewPubSub(url string, qos byte, id, username, password, channelID string, timeout time.Duration, logger *slog.Logger) (PubSub, error) {
if id == "" {
return nil, errEmptyID
}

client, err := newClient(url, id, username, password, timeout)
client, err := newClient(url, id, username, password, channelID, timeout, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,21 +115,49 @@ func (ps *pubsub) Close() error {
return nil
}

func newClient(address, id, username, password string, timeout time.Duration) (mqtt.Client, error) {
func newClient(address, id, username, password, channelID string, timeout time.Duration, logger *slog.Logger) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
AddBroker(address).
SetClientID(id).
SetUsername(username).
SetPassword(password).
AddBroker(address).
SetClientID(id)
SetCleanSession(true)
if channelID != "" {
topic := fmt.Sprintf(aliveTopicTemplate, channelID)
lwtPayload := fmt.Sprintf(lwtPayloadTemplate, username, channelID)
opts.SetWill(topic, lwtPayload, 0, false)
}

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
args := []any{}
if err != nil {
args = append(args, slog.Any("error", err))
}

logger.Info("MQTT connection lost", args...)
})

opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
args := []any{}
if options != nil {
args = append(args,
slog.String("client_id", options.ClientID),
slog.String("username", options.Username),
)
}

logger.Info("MQTT reconnecting", args...)
})

client := mqtt.NewClient(opts)

token := client.Connect()
if token.Error() != nil {
return nil, token.Error()
return nil, errors.Join(errors.New("failed to connect to MQTT broker"), token.Error())
}

if ok := token.WaitTimeout(timeout); !ok {
return nil, errConnect
return nil, errors.New("timeout reached while connecting to MQTT broker")
}

return client, nil
Expand Down
2 changes: 1 addition & 1 deletion propellerd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var managerCmd = []cobra.Command{
ThingID: thingID,
ThingKey: thingKey,
MQTTAddress: mqttAddress,
MQTTQOS: uint8(mqttQOS),
MQTTQoS: uint8(mqttQOS),
MQTTTimeout: mqttTimeout,
}
ctx, cancel := context.WithCancel(cmd.Context())
Expand Down
2 changes: 1 addition & 1 deletion propellerd/proplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var propletCmd = []cobra.Command{
Run: func(cmd *cobra.Command, _ []string) {
cfg := proplet.Config{
LogLevel: logLevel,
ID: id,
InstanceID: id,
MQTTTimeout: mqttTimeout,
MQTTQoS: uint8(mqttQOS),
LivelinessInterval: livelinessInterval,
Expand Down
24 changes: 12 additions & 12 deletions proplet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
)

type Config struct {
LogLevel string `json:"log_level"`
ID string `json:"id"`
MQTTAddress string `json:"mqtt_address"`
MQTTTimeout time.Duration `json:"mqtt_timeout"`
MQTTQoS byte `json:"mqtt_qos"`
LivelinessInterval time.Duration `json:"liveliness_interval"`
RegistryURL string `json:"registry_url,omitempty"`
RegistryToken string `json:"registry_token,omitempty"`
RegistryTimeout time.Duration `json:"registry_timeout,omitempty"`
ChannelID string `json:"channel_id"`
ThingID string `json:"thing_id"`
ThingKey string `json:"thing_key"`
LogLevel string
InstanceID string
MQTTAddress string
MQTTTimeout time.Duration
MQTTQoS byte
LivelinessInterval time.Duration
RegistryURL string
RegistryToken string
RegistryTimeout time.Duration
ChannelID string
ThingID string
ThingKey string
}

func (c Config) Validate() error {
Expand Down
Loading

0 comments on commit 05c78fe

Please sign in to comment.