Skip to content

Commit

Permalink
Improved build. .env and config.yaml exposed for this example's purpo…
Browse files Browse the repository at this point in the history
…ses only
  • Loading branch information
arturogonzalezm committed Jul 31, 2024
1 parent 1ccfabd commit c32274c
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 71 deletions.
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DB_HOST=db
DB_USER=postgres
DB_PASSWORD=postgres
DB_NAME=postgres
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ db-shell:
test:
go test ./...

#run:
# go run cmd/monitor/main.go
run:
go run cmd/monitor/main.go

# Run the Go application
run:
@echo "Running the Go application..."
DB_HOST=localhost DB_USER=postgres DB_PASSWORD=postgres DB_NAME=postgres go run cmd/monitor/main.go
#run:
# @echo "Running the Go application..."
# DB_HOST=localhost DB_USER=postgres DB_PASSWORD=postgres DB_NAME=postgres go run cmd/monitor/main.go
86 changes: 30 additions & 56 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,36 @@ import (
"syscall"
"time"

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/processor"
"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/websocket"
_ "github.com/lib/pq"
"github.com/spf13/viper"

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/monitor"
)

type Config struct {
DB struct {
Host string
User string
Password string
Name string
}
Symbols []string
}

func main() {
log.Println("Starting RealTimeCryptoMonitor...")

symbols := []string{"btcusdt", "ethusdt", "ltcusdt"} // Add more symbols as needed

// Get database connection details from environment variables
dbHost := os.Getenv("DB_HOST")
dbUser := os.Getenv("DB_USER")
dbPassword := os.Getenv("DB_PASSWORD")
dbName := os.Getenv("DB_NAME")
// Load configuration
var config Config
if err := loadConfig(&config); err != nil {
log.Fatalf("Error loading configuration: %v", err)
}

fmt.Printf("DB_HOST: %s, DB_USER: %s, DB_PASSWORD: %s, DB_NAME: %s\n", dbHost, dbUser, dbPassword, dbName)
// Print loaded configuration
fmt.Printf("DB_HOST: %s, DB_USER: %s, DB_PASSWORD: %s, DB_NAME: %s\n", config.DB.Host, config.DB.User, config.DB.Password, config.DB.Name)

// Database connection setup
connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", dbUser, dbPassword, dbHost, dbName)
connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", config.DB.User, config.DB.Password, config.DB.Host, config.DB.Name)
var db *sql.DB
var err error

Expand Down Expand Up @@ -59,11 +69,11 @@ func main() {
// WaitGroup to manage goroutines
var wg sync.WaitGroup

for _, symbol := range symbols {
for _, symbol := range config.Symbols {
wg.Add(1)
go func(symbol string) {
defer wg.Done()
monitorSymbol(symbol, db, stop)
monitor.MonitorSymbol(symbol, db, stop)
}(symbol)
}

Expand All @@ -79,50 +89,14 @@ func main() {
log.Println("All symbol monitoring stopped. Exiting program.")
}

func monitorSymbol(symbol string, db *sql.DB, stop chan struct{}) {
log.Printf("Starting monitoring for symbol: %s", symbol)
uri := fmt.Sprintf("wss://stream.binance.com:9443/ws/%s@ticker", symbol)
func loadConfig(config *Config) error {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath("configs")

client := websocket.NewClient()
if err := client.Connect(uri); err != nil {
log.Fatalf("WebSocket connection error for symbol %s: %v", symbol, err)
if err := viper.ReadInConfig(); err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
log.Printf("Error closing WebSocket client for symbol %s: %v", symbol, err)
}
}()

// Create PGWriter
pgWriter, err := processor.NewPGWriter(db)
if err != nil {
log.Fatalf("Error creating PostgreSQL writer for symbol %s: %v", symbol, err)
}
defer func() {
if err := pgWriter.Close(); err != nil {
log.Printf("Error closing PostgreSQL writer for symbol %s: %v", symbol, err)
}
}()

client.AddProcessor(pgWriter)

go client.Listen(stop)

log.Printf("WebSocket connection opened for %s", symbol)

// Periodic summary
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-stop:
log.Printf("Stopping monitoring for symbol: %s", symbol)
return
case <-ticker.C:
// Print a summary every 5 seconds
log.Printf("Symbol: %s - Last 5 seconds: Processed %d messages", symbol, pgWriter.GetProcessedCount())
log.Printf("Symbol: %s - Current buffer size: %d", symbol, pgWriter.GetBufferSize())
}
}
return viper.Unmarshal(&config)
}
9 changes: 9 additions & 0 deletions configs/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
db:
host: "localhost"
user: "postgres"
password: "postgres"
name: "postgres"
symbols:
- "btcusdt"
- "ethusdt"
- "ltcusdt"
18 changes: 10 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ services:
db:
image: postgres:latest
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${DB_NAME}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./database/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"]
interval: 10s
timeout: 5s
retries: 5
Expand All @@ -22,12 +22,14 @@ services:
db:
condition: service_healthy
environment:
DB_HOST: db
DB_USER: postgres
DB_PASSWORD: postgres
DB_NAME: postgres
DB_HOST: ${DB_HOST}
DB_USER: ${DB_USER}
DB_PASSWORD: ${DB_PASSWORD}
DB_NAME: ${DB_NAME}
ports:
- "8080:8080"
volumes:
- ./configs:/app/configs

volumes:
pgdata:
29 changes: 27 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,33 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/adshao/go-binance/v2 v2.6.0 // indirect
github.com/bitly/go-simplejson v0.5.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
60 changes: 60 additions & 0 deletions internal/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package monitor

import (
"database/sql"
"fmt"
"log"
"time"

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/processor"
"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/websocket"
)

// MonitorSymbol starts monitoring for a specific symbol
func MonitorSymbol(symbol string, db *sql.DB, stop chan struct{}) {
log.Printf("Starting monitoring for symbol: %s", symbol)
uri := fmt.Sprintf("wss://stream.binance.com:9443/ws/%s@ticker", symbol)

client := websocket.NewClient()
if err := client.Connect(uri); err != nil {
log.Fatalf("WebSocket connection error for symbol %s: %v", symbol, err)
}
defer func() {
if err := client.Close(); err != nil {
log.Printf("Error closing WebSocket client for symbol %s: %v", symbol, err)
}
}()

// Create PGWriter
pgWriter, err := processor.NewPGWriter(db)
if err != nil {
log.Fatalf("Error creating PostgreSQL writer for symbol %s: %v", symbol, err)
}
defer func() {
if err := pgWriter.Close(); err != nil {
log.Printf("Error closing PostgreSQL writer for symbol %s: %v", symbol, err)
}
}()

client.AddProcessor(pgWriter)

go client.Listen(stop)

log.Printf("WebSocket connection opened for %s", symbol)

// Periodic summary
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-stop:
log.Printf("Stopping monitoring for symbol: %s", symbol)
return
case <-ticker.C:
// Print a summary every 5 seconds
log.Printf("Symbol: %s - Last 5 seconds: Processed %d messages", symbol, pgWriter.GetProcessedCount())
log.Printf("Symbol: %s - Current buffer size: %d", symbol, pgWriter.GetBufferSize())
}
}
}
47 changes: 47 additions & 0 deletions internal/monitor/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package monitor

import (
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/processor"

Check failure on line 8 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/processor" imported and not used
)

func TestMonitorSymbol(t *testing.T) {
ctrl := gomock.NewController(t)

Check failure on line 12 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

undefined: gomock
defer ctrl.Finish()

// Mock dependencies
db, mockDB, err := sqlmock.New()

Check failure on line 16 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

mockDB declared and not used
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
defer db.Close()

mockWebSocketClient := websocket_mocks.NewMockWebSocketClient(ctrl)

Check failure on line 22 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

undefined: websocket_mocks
mockPGWriter := processor_mocks.NewMockDataProcessor(ctrl)

Check failure on line 23 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

undefined: processor_mocks

// Expectations
mockWebSocketClient.EXPECT().Connect(gomock.Any()).Return(nil)

Check failure on line 26 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

undefined: gomock
mockWebSocketClient.EXPECT().Close().Return(nil)
mockWebSocketClient.EXPECT().AddProcessor(mockPGWriter)
mockWebSocketClient.EXPECT().Listen(gomock.Any()).Do(func(stop chan struct{}) {

Check failure on line 29 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

undefined: gomock
time.Sleep(1 * time.Second)
close(stop)
})

mockPGWriter.EXPECT().Close().Return(nil)
mockPGWriter.EXPECT().GetProcessedCount().AnyTimes().Return(0)
mockPGWriter.EXPECT().GetBufferSize().AnyTimes().Return(0)

stop := make(chan struct{})
go MonitorSymbol("btcusdt", db, stop, mockWebSocketClient, mockPGWriter)

Check failure on line 39 in internal/monitor/monitor_test.go

View workflow job for this annotation

GitHub Actions / Test and coverage

too many arguments in call to MonitorSymbol

select {
case <-stop:
// Test passed
case <-time.After(2 * time.Second):
t.Fatalf("Test timed out")
}
}

0 comments on commit c32274c

Please sign in to comment.