Skip to content

Commit

Permalink
add env file and main file
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
  • Loading branch information
nyagamunene committed Dec 11, 2024
1 parent 1968a73 commit 11bc6ce
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 122 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/caarlos0/env/v11 v11.2.2
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/google/uuid v1.6.0
github.com/joho/godotenv v1.5.1
github.com/tetratelabs/wazero v1.7.3
golang.org/x/sync v0.7.0
oras.land/oras-go/v2 v2.5.0
)

Expand All @@ -16,4 +16,5 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
Expand Down
14 changes: 6 additions & 8 deletions proxy/http/http.go → proxy/config/http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package config

import (
"context"
Expand All @@ -13,25 +13,23 @@ import (

const tag = "latest"

var envPrefix = "ORAS_"

type Config struct {
type HTTPProxyConfig struct {
RegistryURL string `env:"REGISTRY_URL" envDefault:"localhost:5000"`
Authenticate bool `env:"AUTHENTICATE" envDefault:"false"`
Username string `env:"USERNAME" envDefault:""`
Password string `env:"PASSWORD" envDefault:""`
}

func Init() (*Config, error) {
config := Config{}
if err := env.ParseWithOptions(&config, env.Options{Prefix: envPrefix}); err != nil {
func LoadHTTPConfig(opts env.Options) (*HTTPProxyConfig, error) {
config := HTTPProxyConfig{}
if err := env.ParseWithOptions(&config, opts); err != nil {
return nil, err
}

return &config, nil
}

func (c *Config) FetchFromReg(ctx context.Context, containerName string) ([]byte, error) {
func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string) ([]byte, error) {
fullPath := fmt.Sprintf("%s/%s", c.RegistryURL, containerName)

repo, err := remote.NewRepository(fullPath)
Expand Down
21 changes: 17 additions & 4 deletions proxy/config/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package config

import (
"github.com/caarlos0/env/v11"
)

type MQTTProxyConfig struct {
BrokerURL string `json:"broker_url"`
Password string `json:"password"`
PropletID string `json:"proplet_id"`
ChannelID string `json:"channel_id"`
BrokerURL string `env:"BROKER_URL" envDefault:""`
Password string `env:"PASSWORD" envDefault:""`
PropletID string `env:"PROPLET_ID" envDefault:""`
ChannelID string `env:"CHANNEL_ID" envDefault:""`
}

func LoadMQTTConfig(opts env.Options) (*MQTTProxyConfig, error) {
c := MQTTProxyConfig{}
if err := env.ParseWithOptions(&c, opts); err != nil {
return nil, err
}

return &c, nil
}
80 changes: 0 additions & 80 deletions proxy/handler.go

This file was deleted.

4 changes: 2 additions & 2 deletions proxy/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type RegistryClient struct {
}

func NewMQTTClient(config *config.MQTTProxyConfig) (*RegistryClient, error) {
fmt.Printf("config is %+v\n", config)
opts := mqtt.NewClientOptions().
AddBroker(config.BrokerURL).
SetClientID(fmt.Sprintf("Proplet-%s", config.PropletID)).
Expand Down Expand Up @@ -59,9 +60,8 @@ func (c *RegistryClient) Connect(ctx context.Context) error {
}

func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error {
// Subscribe to container requests
subTopic := fmt.Sprintf("channels/%s/message/registry/proplet", c.config.ChannelID)

fmt.Printf("subtopic is %+v\n", subTopic)
handler := func(client mqtt.Client, msg mqtt.Message) {
data := msg.Payload()

Expand Down
42 changes: 15 additions & 27 deletions proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,44 @@ import (
"log/slog"

"github.com/absmach/propeller/proxy/config"
orasHTTP "github.com/absmach/propeller/proxy/http"
"github.com/absmach/propeller/proxy/mqtt"
)

type ProxyService struct {
orasconfig orasHTTP.Config
orasconfig *config.HTTPProxyConfig
mqttClient *mqtt.RegistryClient
logger *slog.Logger
containerChan chan string
dataChan chan []byte
}

func NewService(ctx context.Context, cfg *config.MQTTProxyConfig, logger *slog.Logger) (*ProxyService, error) {
mqttClient, err := mqtt.NewMQTTClient(cfg)
func NewService(ctx context.Context, cfgM *config.MQTTProxyConfig, cfgH *config.HTTPProxyConfig, logger *slog.Logger) (*ProxyService, error) {
mqttClient, err := mqtt.NewMQTTClient(cfgM)
if err != nil {
return nil, fmt.Errorf("failed to initialize MQTT client: %w", err)
}

config, err := orasHTTP.Init()
if err != nil {
return nil, fmt.Errorf("failed to initialize oras http client: %w", err)
}

return &ProxyService{
orasconfig: *config,
orasconfig: cfgH,
mqttClient: mqttClient,
logger: logger,
containerChan: make(chan string, 1),
dataChan: make(chan []byte, 1),
}, nil
}

func (s *ProxyService) Start(ctx context.Context) error {
errs := make(chan error, 2)

if err := s.mqttClient.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", err)
}
defer s.mqttClient.Disconnect(ctx)

if err := s.mqttClient.Subscribe(ctx, s.containerChan); err != nil {
return fmt.Errorf("failed to subscribe to container requests: %w", err)
}

go s.streamHTTP(ctx, errs)
go s.streamMQTT(ctx, errs)
// MQTTClient returns the MQTT client
func (s *ProxyService) MQTTClient() *mqtt.RegistryClient {
return s.mqttClient
}

return <-errs
// ContainerChan returns the container channel
func (s *ProxyService) ContainerChan() chan string {
return s.containerChan
}

func (s *ProxyService) streamHTTP(ctx context.Context, errs chan error) {
// StreamHTTP handles the HTTP stream processing
func (s *ProxyService) StreamHTTP(ctx context.Context, errs chan error) {
for {
select {
case <-ctx.Done():
Expand All @@ -80,7 +67,8 @@ func (s *ProxyService) streamHTTP(ctx context.Context, errs chan error) {
}
}

func (s *ProxyService) streamMQTT(ctx context.Context, errs chan error) {
// StreamMQTT handles the MQTT stream processing
func (s *ProxyService) StreamMQTT(ctx context.Context, errs chan error) {
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 11bc6ce

Please sign in to comment.