Skip to content

Commit

Permalink
Develop (#283)
Browse files Browse the repository at this point in the history
* technical debt
  • Loading branch information
e154 authored Aug 10, 2024
1 parent 097b246 commit 18cfc40
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 182 deletions.
4 changes: 2 additions & 2 deletions Makefile.local
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ build_server:
GOOS=linux GOARCH=arm GOARM=7 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-linux-arm-7

build_local:
@echo MARK: build local
${GO_BUILD_ENV} GOOS=darwin GOARCH=arm64 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-darwin-10.6-arm64
@echo MARK: build local
${GO_BUILD_ENV} GOOS=darwin GOARCH=arm64 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-darwin-10.6-arm64

build_cli:
@echo MARK: build cli
Expand Down
6 changes: 5 additions & 1 deletion plugins/ble/ble.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package ble

import (
"go.uber.org/atomic"
"sync"

"go.uber.org/atomic"
"tinygo.org/x/bluetooth"
)

Expand Down Expand Up @@ -49,6 +50,9 @@ func NewBle(address string, timeout, connectionTimeout int64, debug bool) *Ble {
}

ble.adapter.SetConnectHandler(func(device bluetooth.Device, connected bool) {
if !ble.connected.CompareAndSwap(!connected, connected) {
return
}
log.Infof("bluetooth device: %s, connected: %t", device.Address.String(), connected)
ble.connected.Store(connected)
ble.device = &device
Expand Down
1 change: 0 additions & 1 deletion plugins/cgminer/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"embed"

"github.com/e154/smart-home/common/events"

"github.com/e154/smart-home/common/logger"
m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/system/supervisor"
Expand Down
4 changes: 1 addition & 3 deletions plugins/email/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"context"
"embed"

"github.com/e154/smart-home/system/supervisor"

"github.com/e154/smart-home/common/logger"

m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/plugins/notify"
"github.com/e154/smart-home/system/supervisor"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion plugins/hdd/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ func (e *Actor) selfUpdate() {
e.Attrs[AttrInodesUsedPercent].Value = r.InodesUsedPercent
e.AttrMu.Unlock()
}
e.SaveState(false, false)
e.SaveState(false, true)
}
4 changes: 1 addition & 3 deletions plugins/memory/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ import (
"fmt"
"time"

"github.com/e154/smart-home/system/supervisor"

"github.com/e154/smart-home/common"

m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/system/supervisor"
)

var _ supervisor.Pluggable = (*plugin)(nil)
Expand Down
4 changes: 1 addition & 3 deletions plugins/memory_app/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"time"

"github.com/e154/smart-home/common"

"github.com/e154/smart-home/system/supervisor"

m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/system/supervisor"
)

var _ supervisor.Pluggable = (*plugin)(nil)
Expand Down
4 changes: 1 addition & 3 deletions plugins/messagebird/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"context"
"embed"

"github.com/e154/smart-home/system/supervisor"

"github.com/e154/smart-home/common/logger"

m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/plugins/notify"
"github.com/e154/smart-home/system/supervisor"
)

var (
Expand Down
7 changes: 3 additions & 4 deletions plugins/modbus_rtu/binds.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import (
"strings"
"time"

"github.com/e154/smart-home/common/apperr"

"github.com/e154/bus"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/e154/bus"
"github.com/e154/smart-home/common/apperr"
"github.com/e154/smart-home/plugins/node"
"go.uber.org/atomic"
)

type modbusRtu func(f string, address, count uint16, command []uint16) (result ModBusResponse)
Expand Down
4 changes: 1 addition & 3 deletions plugins/modbus_rtu/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"context"
"embed"

"github.com/e154/smart-home/system/supervisor"

"github.com/e154/smart-home/common/events"

"github.com/e154/smart-home/common/logger"
m "github.com/e154/smart-home/models"
"github.com/e154/smart-home/system/supervisor"
)

var (
Expand Down
13 changes: 13 additions & 0 deletions plugins/updater/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ func (p *plugin) Load(ctx context.Context, service supervisor.Service) (err erro
Id: common.EntityId(fmt.Sprintf("%s.%s", EntityUpdater, Name)),
PluginName: Name,
Attributes: NewAttr(),
Actions: []*m.EntityAction{
{
Name: "check",
},
},
States: []*m.EntityState{
{
Name: "error",
},
{
Name: "exist_update",
},
},
}
err = p.Service.Adaptors().Entity.Add(context.Background(), entity)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ const getPluginList = async () => {
page: 1,
limit: 99,
triggers: true,
enabled: true,
}
const res = await api.v1.pluginServiceGetPluginList(params)
.catch(() => {
Expand Down
1 change: 0 additions & 1 deletion system/automation/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewTrigger(

var triggerPlugin triggers.ITrigger
if triggerPlugin, err = rawPlugin.GetTrigger(pluginName); err != nil {
log.Error(err.Error())
return
}

Expand Down
1 change: 0 additions & 1 deletion system/automation/trigger_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func (a *triggerManager) addTrigger(model *m.Trigger) (err error) {

var trigger *Trigger
if trigger, err = NewTrigger(a.eventBus, a.scriptService, model, a.rawPlugin); err != nil {
log.Error(err.Error())
return
}

Expand Down
84 changes: 32 additions & 52 deletions system/gate/client/wsp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"runtime/debug"
Expand All @@ -31,6 +30,8 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/e154/smart-home/api"
"github.com/e154/smart-home/common/apperr"
Expand All @@ -55,19 +56,21 @@ type Connection struct {
cli *stream.Client
debug bool
*sync.Mutex
ws *websocket.Conn
ws *websocket.Conn
isClosed *atomic.Bool
}

// NewConnection create a Connection object
func NewConnection(pool *Pool,
api *api.Api,
stream *stream.Stream) *Connection {
c := &Connection{
pool: pool,
status: CONNECTING,
api: api,
stream: stream,
Mutex: &sync.Mutex{},
pool: pool,
status: CONNECTING,
api: api,
stream: stream,
Mutex: &sync.Mutex{},
isClosed: atomic.NewBool(true),
}
return c
}
Expand All @@ -87,6 +90,14 @@ func (c *Connection) Connect(ctx context.Context) (err error) {
return err
}

c.ws.SetCloseHandler(func(code int, text string) error {
c.isClosed.Store(true)
message := websocket.FormatCloseMessage(code, "")
return c.ws.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
})

c.isClosed.Store(false)

if c.debug {
log.Infof("Connected to %s", c.pool.target)
defer log.Info("Connection closed ...")
Expand Down Expand Up @@ -176,46 +187,20 @@ func (c *Connection) serve(ctx context.Context) {
}

// Deserialize request
httpRequest := new(common.HTTPRequest)
err = json.Unmarshal(jsonRequest, httpRequest)
if err != nil {
c.error(fmt.Sprintf("Unable to deserialize json http request : %s\n", err))
break
}

req, err := common.UnserializeHTTPRequest(httpRequest)
req, err := common.DeserializeHTTPRequest(jsonRequest)
if err != nil {
c.error(fmt.Sprintf("Unable to deserialize http request : %v\n", err))
break
}

//log.Infof("[%s] %s", req.Method, req.URL.String())

// Pipe request body
_, bodyReader, err := c.ws.NextReader()
if err != nil {
log.Errorf("Unable to get response body reader : %v", err)
break
}
req.Body = io.NopCloser(bodyReader)

// Execute request
//resp, err := c.pool.client.client.Do(req)
//if err != nil {
// err = c.error(fmt.Sprintf("Unable to execute request : %v\n", err))
// if err != nil {
// break
// }
// continue
//}

//todo fix
req.RequestURI = req.URL.String()
resp := httptest.NewRecorder()
c.api.Echo().ServeHTTP(resp, req)

// Serialize response
jsonResponse, err := json.Marshal(common.SerializeHTTPResponse(resp.Result()))
jsonResponse, err := json.Marshal(common.SerializeHTTPResponse(resp))
if err != nil {
err = c.error(fmt.Sprintf("Unable to serialize response : %v\n", err))
if err != nil {
Expand All @@ -225,24 +210,12 @@ func (c *Connection) serve(ctx context.Context) {
}

// Write response
err = c.WriteMessage(websocket.TextMessage, jsonResponse)
err = c.WriteMessage(websocket.BinaryMessage, jsonResponse)
if err != nil {
log.Errorf("Unable to write response : %v", err)
break
}

// Pipe response body
bodyWriter, err := c.ws.NextWriter(websocket.BinaryMessage)
if err != nil {
log.Errorf("Unable to get response body writer : %v", err)
break
}
_, err = io.Copy(bodyWriter, resp.Body)
if err != nil {
log.Errorf("Unable to get pipe response body : %v", err)
break
}
bodyWriter.Close()
}
}

Expand Down Expand Up @@ -280,6 +253,11 @@ func (c *Connection) error(msg string) (err error) {

// Close close the ws/tcp connection and remove it from the pool
func (c *Connection) Close() {
if !c.isClosed.CompareAndSwap(false, true) {
return
}

c.isClosed.Store(true)

if c.ws != nil {
if err := c.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
Expand All @@ -290,8 +268,11 @@ func (c *Connection) Close() {
}
}

func (c *Connection) WriteMessage(messageType int, data []byte) (err error) {
//todo: fix, it not work
func (c *Connection) WriteMessage(messageType int, data []byte) error {
if c.isClosed.Load() {
return errors.New("connection is closed")
}

c.Lock()
defer func() {
c.Unlock()
Expand All @@ -301,6 +282,5 @@ func (c *Connection) WriteMessage(messageType int, data []byte) (err error) {
}
}()

err = c.ws.WriteMessage(messageType, data)
return
return c.ws.WriteMessage(messageType, data)
}
Loading

0 comments on commit 18cfc40

Please sign in to comment.