Skip to content

Commit

Permalink
fix: update registry topic
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
  • Loading branch information
rodneyosodo committed Dec 17, 2024
1 parent 05c78fe commit 3298ce6
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
1 change: 0 additions & 1 deletion pkg/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

var (
errConnect = errors.New("failed to connect to MQTT broker")
errPublishTimeout = errors.New("failed to publish due to timeout reached")
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
Expand Down
11 changes: 0 additions & 11 deletions proplet/mqtt.go

This file was deleted.

16 changes: 16 additions & 0 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ const (
pollingInterval = 5 * time.Second
)

var (
RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry"
updateRegistryTopicTemplate = "channels/%s/messages/control/manager/update"
aliveTopicTemplate = "channels/%s/messages/control/proplet/alive"
discoveryTopicTemplate = "channels/%s/messages/control/proplet/create"
startTopicTemplate = "channels/%s/messages/control/manager/start"
stopTopicTemplate = "channels/%s/messages/control/manager/stop"
registryResponseTopic = "channels/%s/messages/registry/server"
fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet"
)

type PropletService struct {
config Config
pubsub pkgmqtt.PubSub
Expand Down Expand Up @@ -105,6 +116,11 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
return fmt.Errorf("failed to subscribe to registry topics: %w", err)
}

topic = fmt.Sprintf(updateRegistryTopicTemplate, p.config.ChannelID)
if err := p.pubsub.Subscribe(ctx, topic, p.registryUpdate(ctx)); err != nil {
return fmt.Errorf("failed to subscribe to update registry topic: %w", err)
}

logger.Info("Proplet service is running.")
<-ctx.Done()

Expand Down
5 changes: 0 additions & 5 deletions proplet/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type Runtime interface {
type wazeroRuntime struct {
mutex sync.Mutex
runtimes map[string]wazero.Runtime
results map[string][]uint64
pubsub mqtt.PubSub
channelID string
logger *slog.Logger
Expand All @@ -31,7 +30,6 @@ type wazeroRuntime struct {
func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) Runtime {
return &wazeroRuntime{
runtimes: make(map[string]wazero.Runtime),
results: make(map[string][]uint64),
pubsub: pubsub,
channelID: channelID,
logger: logger,
Expand Down Expand Up @@ -66,9 +64,6 @@ func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, fun

return
}
w.mutex.Lock()
w.results[id] = results
w.mutex.Unlock()

if err := w.StopApp(ctx, id); err != nil {
w.logger.Error("failed to stop app", slog.String("id", id), slog.String("error", err.Error()))
Expand Down

0 comments on commit 3298ce6

Please sign in to comment.