diff --git a/go.mod b/go.mod index 2a04d87..db389d1 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/caarlos0/env/v11 v11.2.2 github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/google/uuid v1.6.0 + github.com/joho/godotenv v1.5.1 github.com/tetratelabs/wazero v1.7.3 - golang.org/x/sync v0.7.0 oras.land/oras-go/v2 v2.5.0 ) @@ -16,4 +16,5 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index b25556b..c4238cf 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= diff --git a/proxy/http/http.go b/proxy/config/http.go similarity index 82% rename from proxy/http/http.go rename to proxy/config/http.go index 7e30c8c..aaf494c 100644 --- a/proxy/http/http.go +++ b/proxy/config/http.go @@ -1,4 +1,4 @@ -package http +package config import ( "context" @@ -13,25 +13,23 @@ import ( const tag = "latest" -var envPrefix = "ORAS_" - -type Config struct { +type HTTPProxyConfig struct { RegistryURL string `env:"REGISTRY_URL" envDefault:"localhost:5000"` Authenticate bool `env:"AUTHENTICATE" envDefault:"false"` Username string `env:"USERNAME" envDefault:""` Password string `env:"PASSWORD" envDefault:""` } -func Init() (*Config, error) { - config := Config{} - if err := env.ParseWithOptions(&config, env.Options{Prefix: envPrefix}); err != nil { +func LoadHTTPConfig(opts env.Options) (*HTTPProxyConfig, error) { + config := HTTPProxyConfig{} + if err := env.ParseWithOptions(&config, opts); err != nil { return nil, err } return &config, nil } -func (c *Config) FetchFromReg(ctx context.Context, containerName string) ([]byte, error) { +func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string) ([]byte, error) { fullPath := fmt.Sprintf("%s/%s", c.RegistryURL, containerName) repo, err := remote.NewRepository(fullPath) diff --git a/proxy/config/mqtt.go b/proxy/config/mqtt.go index 8fe213c..66cf910 100644 --- a/proxy/config/mqtt.go +++ b/proxy/config/mqtt.go @@ -1,8 +1,21 @@ package config +import ( + "github.com/caarlos0/env/v11" +) + type MQTTProxyConfig struct { - BrokerURL string `json:"broker_url"` - Password string `json:"password"` - PropletID string `json:"proplet_id"` - ChannelID string `json:"channel_id"` + BrokerURL string `env:"BROKER_URL" envDefault:""` + Password string `env:"PASSWORD" envDefault:""` + PropletID string `env:"PROPLET_ID" envDefault:""` + ChannelID string `env:"CHANNEL_ID" envDefault:""` +} + +func LoadMQTTConfig(opts env.Options) (*MQTTProxyConfig, error) { + c := MQTTProxyConfig{} + if err := env.ParseWithOptions(&c, opts); err != nil { + return nil, err + } + + return &c, nil } diff --git a/proxy/handler.go b/proxy/handler.go deleted file mode 100644 index 546ecee..0000000 --- a/proxy/handler.go +++ /dev/null @@ -1,80 +0,0 @@ -package proxy - -import ( - "context" - "errors" - "log/slog" -) - -var _ Handler = (*handler)(nil) - -var errSessionMissing = errors.New("session is missing") - -type Session struct { - ID string - Username string - Password []byte -} - -type sessionKey struct{} - -type Handler interface { - Connect(ctx context.Context) error - - Disconnect(ctx context.Context) error - - Publish(ctx context.Context, topic *string, payload *[]byte) error -} - -type handler struct { - logger *slog.Logger -} - -func New(logger *slog.Logger) *handler { - return &handler{ - logger: logger, - } -} - -func (h *handler) Connect(ctx context.Context) error { - return h.logAction(ctx, "Connect", nil, nil) -} - -func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error { - return h.logAction(ctx, "Publish", &[]string{*topic}, payload) -} - -func (h *handler) Disconnect(ctx context.Context) error { - return h.logAction(ctx, "Disconnect", nil, nil) -} - -func (h *handler) logAction(ctx context.Context, action string, topics *[]string, payload *[]byte) error { - s, ok := FromContext(ctx) - args := []interface{}{ - slog.Group("session", slog.String("id", s.ID), slog.String("username", s.Username)), - } - - if topics != nil { - args = append(args, slog.Any("topics", *topics)) - } - if payload != nil { - args = append(args, slog.Any("payload", *payload)) - } - - if !ok { - args = append(args, slog.Any("error", errSessionMissing)) - h.logger.Error(action+"() failed to complete", args...) - return errSessionMissing - } - - h.logger.Info(action+"() completed successfully", args...) - - return nil -} - -func FromContext(ctx context.Context) (*Session, bool) { - if s, ok := ctx.Value(sessionKey{}).(*Session); ok && s != nil { - return s, true - } - return nil, false -} diff --git a/proxy/mqtt/mqtt.go b/proxy/mqtt/mqtt.go index fe48665..e26c7f4 100644 --- a/proxy/mqtt/mqtt.go +++ b/proxy/mqtt/mqtt.go @@ -17,6 +17,7 @@ type RegistryClient struct { } func NewMQTTClient(config *config.MQTTProxyConfig) (*RegistryClient, error) { + fmt.Printf("config is %+v\n", config) opts := mqtt.NewClientOptions(). AddBroker(config.BrokerURL). SetClientID(fmt.Sprintf("Proplet-%s", config.PropletID)). @@ -59,9 +60,8 @@ func (c *RegistryClient) Connect(ctx context.Context) error { } func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error { - // Subscribe to container requests subTopic := fmt.Sprintf("channels/%s/message/registry/proplet", c.config.ChannelID) - + fmt.Printf("subtopic is %+v\n", subTopic) handler := func(client mqtt.Client, msg mqtt.Message) { data := msg.Payload() diff --git a/proxy/service.go b/proxy/service.go index 4931fe1..a192593 100644 --- a/proxy/service.go +++ b/proxy/service.go @@ -6,31 +6,25 @@ import ( "log/slog" "github.com/absmach/propeller/proxy/config" - orasHTTP "github.com/absmach/propeller/proxy/http" "github.com/absmach/propeller/proxy/mqtt" ) type ProxyService struct { - orasconfig orasHTTP.Config + orasconfig *config.HTTPProxyConfig mqttClient *mqtt.RegistryClient logger *slog.Logger containerChan chan string dataChan chan []byte } -func NewService(ctx context.Context, cfg *config.MQTTProxyConfig, logger *slog.Logger) (*ProxyService, error) { - mqttClient, err := mqtt.NewMQTTClient(cfg) +func NewService(ctx context.Context, cfgM *config.MQTTProxyConfig, cfgH *config.HTTPProxyConfig, logger *slog.Logger) (*ProxyService, error) { + mqttClient, err := mqtt.NewMQTTClient(cfgM) if err != nil { return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) } - config, err := orasHTTP.Init() - if err != nil { - return nil, fmt.Errorf("failed to initialize oras http client: %w", err) - } - return &ProxyService{ - orasconfig: *config, + orasconfig: cfgH, mqttClient: mqttClient, logger: logger, containerChan: make(chan string, 1), @@ -38,25 +32,18 @@ func NewService(ctx context.Context, cfg *config.MQTTProxyConfig, logger *slog.L }, nil } -func (s *ProxyService) Start(ctx context.Context) error { - errs := make(chan error, 2) - - if err := s.mqttClient.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to MQTT broker: %w", err) - } - defer s.mqttClient.Disconnect(ctx) - - if err := s.mqttClient.Subscribe(ctx, s.containerChan); err != nil { - return fmt.Errorf("failed to subscribe to container requests: %w", err) - } - - go s.streamHTTP(ctx, errs) - go s.streamMQTT(ctx, errs) +// MQTTClient returns the MQTT client +func (s *ProxyService) MQTTClient() *mqtt.RegistryClient { + return s.mqttClient +} - return <-errs +// ContainerChan returns the container channel +func (s *ProxyService) ContainerChan() chan string { + return s.containerChan } -func (s *ProxyService) streamHTTP(ctx context.Context, errs chan error) { +// StreamHTTP handles the HTTP stream processing +func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) { for { select { case <-ctx.Done(): @@ -80,7 +67,8 @@ func (s *ProxyService) streamHTTP(ctx context.Context, errs chan error) { } } -func (s *ProxyService) streamMQTT(ctx context.Context, errs chan error) { +// StreamMQTT handles the MQTT stream processing +func (s *ProxyService) StreamMQTT(ctx context.Context, errs chan error) { for { select { case <-ctx.Done():