Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MQTT Handlerの実装 #151

Merged
4 commits merged into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
buf:
buf generate --path=./proto/spec
rm -Rf ./backend/proto
buf generate --path=./proto/
cd ./backend/proto && go mod init github.com/ueckoken/plarail2023/backend/proto
cd ./backend/proto && go mod tidy
16 changes: 16 additions & 0 deletions backend/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
up:
docker compose down
docker compose up --build -d

pull:
git pull

test:
docker compose -f compose.debug.yaml up --build -d

test-status:
docker compose -f compose.debug.yaml ps

test-logs:
docker compose -f compose.debug.yaml logs -f

12 changes: 12 additions & 0 deletions backend/compose.debug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3"

services:
mqtt-broker:
image: eclipse-mosquitto
expose:
- 1883
ports:
- 1883:1883
restart: unless-stopped
volumes:
- ./mosquitto/config:/mosquitto/config
4 changes: 4 additions & 0 deletions backend/mosquitto/config/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set_tcp_nodelay true
listener 1883
allow_anonymous true
max_queued_messages 0
2 changes: 2 additions & 0 deletions backend/mqtt-handler/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MQTT_BROKER_ADDR="tcp://localhost:1883"
CONNECT_SERVER_ADDR="localhost:8553"
2 changes: 2 additions & 0 deletions backend/mqtt-handler/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
build:
CGO_ENABLED=0 go build -o mqtt-handler main.go
5 changes: 5 additions & 0 deletions backend/mqtt-handler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Device Manager

MQTT Brokerを経由してESP32に対して命令を出力、受け取るためのサービスです。

やることとしては、基本的にProtobufで定義されたメッセージをMQTT BrokerにPublishするだけのStatelessなサービスです。
11 changes: 11 additions & 0 deletions backend/mqtt-handler/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/ueckoken/plarail2023/backend/mqtt-handler

go 1.21.1

require github.com/eclipse/paho.mqtt.golang v1.4.3

require (
github.com/gorilla/websocket v1.5.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
)
8 changes: 8 additions & 0 deletions backend/mqtt-handler/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
54 changes: 54 additions & 0 deletions backend/mqtt-handler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Mqtt event handler

package main

import (
"fmt"
"net/http"
"os"
"os/signal"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/ueckoken/plarail2023/backend/mqtt-handler/pkg/connect"
"github.com/ueckoken/plarail2023/backend/mqtt-handler/pkg/handler"
mqttclient "github.com/ueckoken/plarail2023/backend/mqtt-handler/pkg/mqtt"
"github.com/ueckoken/plarail2023/backend/proto/train/v1/trainv1connect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)

func main() {
msgCh := make(chan mqtt.Message)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
msgCh <- msg
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

// Subscribe topics
cc := mqttclient.MakeClient()
mqttclient.Subscribe(cc, "plarail2023/#", f)

// Make Connect handler
server := &connect.TrainServer{}
mux := http.NewServeMux()
path, trainServiceHandler := trainv1connect.NewTrainServiceHandler(server)
mux.Handle(path, trainServiceHandler)
http.ListenAndServe(
os.Getenv("CONNECT_LISTEN_ADDR"),
h2c.NewHandler(mux, &http2.Server{}),
)

// Wait for messages
for {
select {
case m := <-msgCh:
handler.HandleMqttMessage(m.Topic(), string(m.Payload()))
case <-signalCh:
fmt.Printf("interrupted")
cc.Disconnect(1000)
return
}
}

}
23 changes: 23 additions & 0 deletions backend/mqtt-handler/pkg/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package connect

import (
"context"

"github.com/bufbuild/connect-go"
trainv1 "github.com/ueckoken/plarail2023/backend/proto/train/v1"
)

type TrainServer struct {
}

func (s *TrainServer) NotifyTrainArrival(
ctx context.Context,
req *connect.Request[trainv1.NotifyTrainArrivalRequest],
) (*connect.Response[trainv1.NotifyTrainArrivalResponse], error) {
// handler.HandleConnectMessage(req.Target, req.Message)

res := connect.NewResponse(&trainv1.NotifyTrainArrivalResponse{
Success: true,
})
return res, nil
}
10 changes: 10 additions & 0 deletions backend/mqtt-handler/pkg/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package handler

func HandleMqttMessage(topic string, msg string) {
// TODO: Implement
}

func HandleConnectMessage(target string, msg string) {
// TODO: Implement
// mqttclient.Send(cc, "plarail2023/notify/train/arrival", "test")
}
32 changes: 32 additions & 0 deletions backend/mqtt-handler/pkg/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package mqttclient

import (
"log"
"os"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

func MakeClient() mqtt.Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("MQTT_BROKER_ADDR"))
cc := mqtt.NewClient(opts)

if token := cc.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Mqtt error: %s", token.Error())
}

return cc
}

func Subscribe(cc mqtt.Client, topic string, f mqtt.MessageHandler) {
subscribeToken := cc.Subscribe(topic, 0, f)
if subscribeToken.Wait() && subscribeToken.Error() != nil {
log.Fatal(subscribeToken.Error())
}
}

func Send(cc mqtt.Client, topic string, payload string) {
token := cc.Publish(topic, 0, false, payload)
token.Wait()
}
8 changes: 8 additions & 0 deletions backend/proto/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/ueckoken/plarail2023/backend/proto

go 1.21.1

require (
github.com/bufbuild/connect-go v1.10.0
google.golang.org/protobuf v1.31.0
)
10 changes: 10 additions & 0 deletions backend/proto/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/bufbuild/connect-go v1.10.0 h1:QAJ3G9A1OYQW2Jbk3DeoJbkCxuKArrvZgDt47mjdTbg=
github.com/bufbuild/connect-go v1.10.0/go.mod h1:CAIePUgkDR5pAFaylSMtNK45ANQjp9JvpluG20rhpV8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
Loading