Skip to content

Commit

Permalink
Publish LWT on set-up
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
  • Loading branch information
JeffMboya committed Dec 14, 2024
1 parent 7654f44 commit b4f941f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/manager/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er
}
tracer := tp.Tracer(svcName)

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, logger)
mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQOS, svcName, cfg.ThingID, cfg.ThingKey, cfg.MQTTTimeout, "", nil, logger)
if err != nil {
return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error())
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type PubSub interface {
Close() error
}

func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) {
func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, lwtTopic string, lwtPayload map[string]string, logger *slog.Logger) (PubSub, error) {
if id == "" {
return nil, errEmptyID
}

client, err := newClient(url, id, username, password, timeout)
client, err := newClient(url, id, username, password, timeout, lwtTopic, lwtPayload)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -113,12 +113,21 @@ func (ps *pubsub) Close() error {
return nil
}

func newClient(address, id, username, password string, timeout time.Duration) (mqtt.Client, error) {
func newClient(address, id, username, password string, timeout time.Duration, lwtTopic string, lwtPayload map[string]string) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
SetUsername(username).
SetPassword(password).
AddBroker(address).
SetClientID(id)

if lwtTopic != "" && lwtPayload != nil {
payload, err := json.Marshal(lwtPayload)
if err != nil {
return nil, fmt.Errorf("failed to serialize LWT payload: %w", err)
}
opts.SetWill(lwtTopic, string(payload), 0, false)
}

client := mqtt.NewClient(opts)

token := client.Connect()
Expand Down
21 changes: 9 additions & 12 deletions proplet/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,22 @@ type MQTTService struct {
}

func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*MQTTService, error) {
lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID)
lwtPayload := map[string]string{
"status": "offline",
"proplet_id": config.PropletID,
"chan_id": config.ChannelID,
}

pubsub, err := mqtt.NewPubSub(
config.BrokerURL,
qos,
"Proplet-"+config.PropletID,
config.PropletID,
config.Password,
mqttTimeout,
lwtTopic,
lwtPayload,
logger,
)
if err != nil {
Expand All @@ -58,18 +67,6 @@ func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*M
logger: logger,
}

lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID)
lwtPayload := map[string]string{
"status": "offline",
"proplet_id": config.PropletID,
"chan_id": config.ChannelID,
}
if err := pubsub.Publish(ctx, lwtTopic, lwtPayload); err != nil {
logger.Error("Failed to set LWT message", slog.Any("error", err))

return nil, err
}

if err := service.PublishDiscoveryMessage(ctx); err != nil {
logger.Error("Failed to publish discovery message", slog.Any("error", err))

Expand Down

0 comments on commit b4f941f

Please sign in to comment.