diff --git a/Makefile.local b/Makefile.local index 574fa9519..c991bf750 100644 --- a/Makefile.local +++ b/Makefile.local @@ -107,8 +107,8 @@ build_server: GOOS=linux GOARCH=arm GOARM=7 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-linux-arm-7 build_local: - @echo MARK: build local - ${GO_BUILD_ENV} GOOS=darwin GOARCH=arm64 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-darwin-10.6-arm64 + @echo MARK: build local + ${GO_BUILD_ENV} GOOS=darwin GOARCH=arm64 go build ${GO_BUILD_FLAGS} ${GO_BUILD_TAGS} -o ${ROOT}/${EXEC}-darwin-10.6-arm64 build_cli: @echo MARK: build cli diff --git a/plugins/ble/ble.go b/plugins/ble/ble.go index 419e9c678..57668e3d8 100644 --- a/plugins/ble/ble.go +++ b/plugins/ble/ble.go @@ -19,8 +19,9 @@ package ble import ( - "go.uber.org/atomic" "sync" + + "go.uber.org/atomic" "tinygo.org/x/bluetooth" ) @@ -49,6 +50,9 @@ func NewBle(address string, timeout, connectionTimeout int64, debug bool) *Ble { } ble.adapter.SetConnectHandler(func(device bluetooth.Device, connected bool) { + if !ble.connected.CompareAndSwap(!connected, connected) { + return + } log.Infof("bluetooth device: %s, connected: %t", device.Address.String(), connected) ble.connected.Store(connected) ble.device = &device diff --git a/plugins/cgminer/plugin.go b/plugins/cgminer/plugin.go index a22f85691..0b537e0b2 100644 --- a/plugins/cgminer/plugin.go +++ b/plugins/cgminer/plugin.go @@ -23,7 +23,6 @@ import ( "embed" "github.com/e154/smart-home/common/events" - "github.com/e154/smart-home/common/logger" m "github.com/e154/smart-home/models" "github.com/e154/smart-home/system/supervisor" diff --git a/plugins/email/plugin.go b/plugins/email/plugin.go index 1050cfa5c..9a0cff16d 100644 --- a/plugins/email/plugin.go +++ b/plugins/email/plugin.go @@ -22,12 +22,10 @@ import ( "context" "embed" - "github.com/e154/smart-home/system/supervisor" - "github.com/e154/smart-home/common/logger" - m "github.com/e154/smart-home/models" "github.com/e154/smart-home/plugins/notify" + "github.com/e154/smart-home/system/supervisor" ) var ( diff --git a/plugins/hdd/actor.go b/plugins/hdd/actor.go index d4bfb2a6d..71e824a59 100644 --- a/plugins/hdd/actor.go +++ b/plugins/hdd/actor.go @@ -106,5 +106,5 @@ func (e *Actor) selfUpdate() { e.Attrs[AttrInodesUsedPercent].Value = r.InodesUsedPercent e.AttrMu.Unlock() } - e.SaveState(false, false) + e.SaveState(false, true) } diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index b3d8cbbfd..d7654418b 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -24,11 +24,9 @@ import ( "fmt" "time" - "github.com/e154/smart-home/system/supervisor" - "github.com/e154/smart-home/common" - m "github.com/e154/smart-home/models" + "github.com/e154/smart-home/system/supervisor" ) var _ supervisor.Pluggable = (*plugin)(nil) diff --git a/plugins/memory_app/plugin.go b/plugins/memory_app/plugin.go index 1afd26c2a..7ae771d17 100644 --- a/plugins/memory_app/plugin.go +++ b/plugins/memory_app/plugin.go @@ -25,10 +25,8 @@ import ( "time" "github.com/e154/smart-home/common" - - "github.com/e154/smart-home/system/supervisor" - m "github.com/e154/smart-home/models" + "github.com/e154/smart-home/system/supervisor" ) var _ supervisor.Pluggable = (*plugin)(nil) diff --git a/plugins/messagebird/plugin.go b/plugins/messagebird/plugin.go index d691279eb..fef488260 100644 --- a/plugins/messagebird/plugin.go +++ b/plugins/messagebird/plugin.go @@ -22,12 +22,10 @@ import ( "context" "embed" - "github.com/e154/smart-home/system/supervisor" - "github.com/e154/smart-home/common/logger" - m "github.com/e154/smart-home/models" "github.com/e154/smart-home/plugins/notify" + "github.com/e154/smart-home/system/supervisor" ) var ( diff --git a/plugins/modbus_rtu/binds.go b/plugins/modbus_rtu/binds.go index a257031e3..4250fbcff 100644 --- a/plugins/modbus_rtu/binds.go +++ b/plugins/modbus_rtu/binds.go @@ -24,13 +24,12 @@ import ( "strings" "time" - "github.com/e154/smart-home/common/apperr" - + "github.com/e154/bus" "github.com/pkg/errors" + "go.uber.org/atomic" - "github.com/e154/bus" + "github.com/e154/smart-home/common/apperr" "github.com/e154/smart-home/plugins/node" - "go.uber.org/atomic" ) type modbusRtu func(f string, address, count uint16, command []uint16) (result ModBusResponse) diff --git a/plugins/modbus_rtu/plugin.go b/plugins/modbus_rtu/plugin.go index 069398c7a..3170085f8 100644 --- a/plugins/modbus_rtu/plugin.go +++ b/plugins/modbus_rtu/plugin.go @@ -22,12 +22,10 @@ import ( "context" "embed" - "github.com/e154/smart-home/system/supervisor" - "github.com/e154/smart-home/common/events" - "github.com/e154/smart-home/common/logger" m "github.com/e154/smart-home/models" + "github.com/e154/smart-home/system/supervisor" ) var ( diff --git a/plugins/updater/plugin.go b/plugins/updater/plugin.go index f4ba98a29..7a9e5c522 100644 --- a/plugins/updater/plugin.go +++ b/plugins/updater/plugin.go @@ -75,6 +75,19 @@ func (p *plugin) Load(ctx context.Context, service supervisor.Service) (err erro Id: common.EntityId(fmt.Sprintf("%s.%s", EntityUpdater, Name)), PluginName: Name, Attributes: NewAttr(), + Actions: []*m.EntityAction{ + { + Name: "check", + }, + }, + States: []*m.EntityState{ + { + Name: "error", + }, + { + Name: "exist_update", + }, + }, } err = p.Service.Adaptors().Entity.Add(context.Background(), entity) } diff --git a/static_source/admin/src/views/Automation/components/TriggerForm.vue b/static_source/admin/src/views/Automation/components/TriggerForm.vue index a5aa30185..97abddbf5 100644 --- a/static_source/admin/src/views/Automation/components/TriggerForm.vue +++ b/static_source/admin/src/views/Automation/components/TriggerForm.vue @@ -113,6 +113,7 @@ const getPluginList = async () => { page: 1, limit: 99, triggers: true, + enabled: true, } const res = await api.v1.pluginServiceGetPluginList(params) .catch(() => { diff --git a/system/automation/trigger.go b/system/automation/trigger.go index 23c840562..c1079e3d9 100644 --- a/system/automation/trigger.go +++ b/system/automation/trigger.go @@ -61,7 +61,6 @@ func NewTrigger( var triggerPlugin triggers.ITrigger if triggerPlugin, err = rawPlugin.GetTrigger(pluginName); err != nil { - log.Error(err.Error()) return } diff --git a/system/automation/trigger_manager.go b/system/automation/trigger_manager.go index 7bec2311f..28a0b2422 100644 --- a/system/automation/trigger_manager.go +++ b/system/automation/trigger_manager.go @@ -173,7 +173,6 @@ func (a *triggerManager) addTrigger(model *m.Trigger) (err error) { var trigger *Trigger if trigger, err = NewTrigger(a.eventBus, a.scriptService, model, a.rawPlugin); err != nil { - log.Error(err.Error()) return } diff --git a/system/gate/client/wsp/connection.go b/system/gate/client/wsp/connection.go index 014d5fc92..980f30279 100644 --- a/system/gate/client/wsp/connection.go +++ b/system/gate/client/wsp/connection.go @@ -22,7 +22,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "net/http/httptest" "runtime/debug" @@ -31,6 +30,8 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/e154/smart-home/api" "github.com/e154/smart-home/common/apperr" @@ -55,7 +56,8 @@ type Connection struct { cli *stream.Client debug bool *sync.Mutex - ws *websocket.Conn + ws *websocket.Conn + isClosed *atomic.Bool } // NewConnection create a Connection object @@ -63,11 +65,12 @@ func NewConnection(pool *Pool, api *api.Api, stream *stream.Stream) *Connection { c := &Connection{ - pool: pool, - status: CONNECTING, - api: api, - stream: stream, - Mutex: &sync.Mutex{}, + pool: pool, + status: CONNECTING, + api: api, + stream: stream, + Mutex: &sync.Mutex{}, + isClosed: atomic.NewBool(true), } return c } @@ -87,6 +90,14 @@ func (c *Connection) Connect(ctx context.Context) (err error) { return err } + c.ws.SetCloseHandler(func(code int, text string) error { + c.isClosed.Store(true) + message := websocket.FormatCloseMessage(code, "") + return c.ws.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) + }) + + c.isClosed.Store(false) + if c.debug { log.Infof("Connected to %s", c.pool.target) defer log.Info("Connection closed ...") @@ -176,14 +187,7 @@ func (c *Connection) serve(ctx context.Context) { } // Deserialize request - httpRequest := new(common.HTTPRequest) - err = json.Unmarshal(jsonRequest, httpRequest) - if err != nil { - c.error(fmt.Sprintf("Unable to deserialize json http request : %s\n", err)) - break - } - - req, err := common.UnserializeHTTPRequest(httpRequest) + req, err := common.DeserializeHTTPRequest(jsonRequest) if err != nil { c.error(fmt.Sprintf("Unable to deserialize http request : %v\n", err)) break @@ -191,31 +195,12 @@ func (c *Connection) serve(ctx context.Context) { //log.Infof("[%s] %s", req.Method, req.URL.String()) - // Pipe request body - _, bodyReader, err := c.ws.NextReader() - if err != nil { - log.Errorf("Unable to get response body reader : %v", err) - break - } - req.Body = io.NopCloser(bodyReader) - - // Execute request - //resp, err := c.pool.client.client.Do(req) - //if err != nil { - // err = c.error(fmt.Sprintf("Unable to execute request : %v\n", err)) - // if err != nil { - // break - // } - // continue - //} - - //todo fix req.RequestURI = req.URL.String() resp := httptest.NewRecorder() c.api.Echo().ServeHTTP(resp, req) // Serialize response - jsonResponse, err := json.Marshal(common.SerializeHTTPResponse(resp.Result())) + jsonResponse, err := json.Marshal(common.SerializeHTTPResponse(resp)) if err != nil { err = c.error(fmt.Sprintf("Unable to serialize response : %v\n", err)) if err != nil { @@ -225,24 +210,12 @@ func (c *Connection) serve(ctx context.Context) { } // Write response - err = c.WriteMessage(websocket.TextMessage, jsonResponse) + err = c.WriteMessage(websocket.BinaryMessage, jsonResponse) if err != nil { log.Errorf("Unable to write response : %v", err) break } - // Pipe response body - bodyWriter, err := c.ws.NextWriter(websocket.BinaryMessage) - if err != nil { - log.Errorf("Unable to get response body writer : %v", err) - break - } - _, err = io.Copy(bodyWriter, resp.Body) - if err != nil { - log.Errorf("Unable to get pipe response body : %v", err) - break - } - bodyWriter.Close() } } @@ -280,6 +253,11 @@ func (c *Connection) error(msg string) (err error) { // Close close the ws/tcp connection and remove it from the pool func (c *Connection) Close() { + if !c.isClosed.CompareAndSwap(false, true) { + return + } + + c.isClosed.Store(true) if c.ws != nil { if err := c.WriteMessage(websocket.CloseMessage, []byte{}); err != nil { @@ -290,8 +268,11 @@ func (c *Connection) Close() { } } -func (c *Connection) WriteMessage(messageType int, data []byte) (err error) { - //todo: fix, it not work +func (c *Connection) WriteMessage(messageType int, data []byte) error { + if c.isClosed.Load() { + return errors.New("connection is closed") + } + c.Lock() defer func() { c.Unlock() @@ -301,6 +282,5 @@ func (c *Connection) WriteMessage(messageType int, data []byte) (err error) { } }() - err = c.ws.WriteMessage(messageType, data) - return + return c.ws.WriteMessage(messageType, data) } diff --git a/system/gate/common/request.go b/system/gate/common/request.go index 8bf856e4d..8d2aac0fe 100644 --- a/system/gate/common/request.go +++ b/system/gate/common/request.go @@ -19,10 +19,12 @@ package common import ( + "bytes" + "encoding/json" "fmt" + "io" "net/http" "net/url" - "regexp" ) // HTTPRequest is a serializable version of http.Request ( with only usefull fields ) @@ -32,101 +34,44 @@ type HTTPRequest struct { Header map[string][]string ContentLength int64 WS bool + Body []byte } // SerializeHTTPRequest create a new HTTPRequest from a http.Request func SerializeHTTPRequest(req *http.Request) (r *HTTPRequest) { + body, _ := io.ReadAll(req.Body) r = &HTTPRequest{ URL: req.URL.String(), Method: req.Method, Header: req.Header, ContentLength: req.ContentLength, + Body: body, } return } -// UnserializeHTTPRequest create a new http.Request from a HTTPRequest -func UnserializeHTTPRequest(req *HTTPRequest) (r *http.Request, err error) { - r = new(http.Request) - r.Method = req.Method - r.URL, err = url.Parse(req.URL) +// DeserializeHTTPRequest create a new http.Request from a HTTPRequest +func DeserializeHTTPRequest(jsonRequest []byte) (r *http.Request, err error) { + + req := &HTTPRequest{} + err = json.Unmarshal(jsonRequest, req) if err != nil { + err = fmt.Errorf("unable to deserialize json http request : %s", err) return } - r.Header = req.Header - r.ContentLength = req.ContentLength - return -} - -// Rule match HTTP requests to allow / deny access -type Rule struct { - Method string - URL string - Headers map[string]string - - methodRegex *regexp.Regexp - urlRegex *regexp.Regexp - headersRegex map[string]*regexp.Regexp -} -// NewRule creates a new Rule -func NewRule(method string, url string, headers map[string]string) (rule *Rule, err error) { - rule = new(Rule) - rule.Method = method - rule.URL = url - if headers != nil { - rule.Headers = headers - } else { - rule.Headers = make(map[string]string) + var uri *url.URL + if uri, err = url.Parse(req.URL); err != nil { + err = fmt.Errorf("unable to parse url : %s", err.Error()) + return } - err = rule.Compile() - return -} -// Compile the regular expressions -func (rule *Rule) Compile() (err error) { - if rule.Method != "" { - rule.methodRegex, err = regexp.Compile(rule.Method) - if err != nil { - return - } - } - if rule.URL != "" { - rule.urlRegex, err = regexp.Compile(rule.URL) - if err != nil { - return - } - } - rule.headersRegex = make(map[string]*regexp.Regexp) - for header, regexStr := range rule.Headers { - var regex *regexp.Regexp - regex, err = regexp.Compile(regexStr) - if err != nil { - return - } - rule.headersRegex[header] = regex + r = &http.Request{ + Method: req.Method, + URL: uri, + Header: req.Header, + ContentLength: req.ContentLength, + Body: io.NopCloser(bytes.NewReader(req.Body)), } return } - -// Match returns true if the http.Request matches the Rule -func (rule *Rule) Match(req *http.Request) bool { - if rule.methodRegex != nil && !rule.methodRegex.MatchString(req.Method) { - return false - } - if rule.urlRegex != nil && !rule.urlRegex.MatchString(req.URL.String()) { - return false - } - - for headerName, regex := range rule.headersRegex { - if !regex.MatchString(req.Header.Get(headerName)) { - return false - } - } - - return true -} - -func (rule *Rule) String() string { - return fmt.Sprintf("%s %s %v", rule.Method, rule.URL, rule.Headers) -} diff --git a/system/gate/common/response.go b/system/gate/common/response.go index 4da4b7fc8..ccb8fe034 100644 --- a/system/gate/common/response.go +++ b/system/gate/common/response.go @@ -21,6 +21,7 @@ package common import ( "fmt" "net/http" + "net/http/httptest" ) // HTTPResponse is a serializable version of http.Response ( with only useful fields ) @@ -28,15 +29,18 @@ type HTTPResponse struct { StatusCode int Header http.Header ContentLength int64 + Body []byte } // SerializeHTTPResponse create a new HTTPResponse from a http.Response -func SerializeHTTPResponse(resp *http.Response) *HTTPResponse { - r := new(HTTPResponse) - r.StatusCode = resp.StatusCode - r.Header = resp.Header - r.ContentLength = resp.ContentLength - return r +func SerializeHTTPResponse(resp *httptest.ResponseRecorder) *HTTPResponse { + result := resp.Result() + return &HTTPResponse{ + StatusCode: result.StatusCode, + Header: result.Header, + ContentLength: result.ContentLength, + Body: resp.Body.Bytes(), + } } // NewHTTPResponse creates a new HTTPResponse diff --git a/system/gate/server/wsp/connection.go b/system/gate/server/wsp/connection.go index 8f46790cd..38134cfab 100644 --- a/system/gate/server/wsp/connection.go +++ b/system/gate/server/wsp/connection.go @@ -193,21 +193,21 @@ func (c *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err e // [2]: Send the HTTP request to the peer // Send the serialized HTTP request to the the peer - if err = c.ws.WriteMessage(websocket.TextMessage, jsonReq); err != nil { + if err = c.ws.WriteMessage(websocket.BinaryMessage, jsonReq); err != nil { return fmt.Errorf("unable to write request : %w", err) } // Pipe the HTTP request body to the peer - bodyWriter, err := c.ws.NextWriter(websocket.BinaryMessage) - if err != nil { - return fmt.Errorf("unable to get request body writer : %w", err) - } - if _, err = io.Copy(bodyWriter, r.Body); err != nil { - return fmt.Errorf("unable to pipe request body : %w", err) - } - if err = bodyWriter.Close(); err != nil { - return fmt.Errorf("unable to pipe request body (close) : %w", err) - } + //bodyWriter, err := c.ws.NextWriter(websocket.BinaryMessage) + //if err != nil { + // return fmt.Errorf("unable to get request body writer : %w", err) + //} + //if _, err = io.Copy(bodyWriter, r.Body); err != nil { + // return fmt.Errorf("unable to pipe request body : %w", err) + //} + //if err = bodyWriter.Close(); err != nil { + // return fmt.Errorf("unable to pipe request body (close) : %w", err) + //} msg, ok := <-c.queue if !ok { @@ -217,7 +217,7 @@ func (c *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err e jsonResponse := msg.Value // Deserialize the HTTP Response - httpResponse := new(common.HTTPResponse) + httpResponse := &common.HTTPResponse{} if err = json.Unmarshal(jsonResponse, httpResponse); err != nil { return fmt.Errorf("unable to unserialize http response : %w", err) } @@ -230,15 +230,15 @@ func (c *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err e } w.WriteHeader(httpResponse.StatusCode) - msg, ok = <-c.queue - if !ok { - return - } - - responseBody := msg.Value - - responseBodyReader := bytes.NewReader(responseBody) + //msg, ok = <-c.queue + //if !ok { + // return + //} + // + //responseBody := msg.Value + // + responseBodyReader := bytes.NewReader(httpResponse.Body) if _, err = io.Copy(w, responseBodyReader); err != nil { return fmt.Errorf("unable to pipe response body : %w", err) } diff --git a/system/gate/server/wsp/pools.go b/system/gate/server/wsp/pools.go index 16adf0c2d..cc992c77b 100644 --- a/system/gate/server/wsp/pools.go +++ b/system/gate/server/wsp/pools.go @@ -99,7 +99,7 @@ func (p *Pools) Clean() { for _, pool := range p.pools { if pool.IsEmpty() { - log.Infof("Removing empty connection pool : %p", pool.id) + log.Infof("Removing empty connection pool : %s", pool.id) pool.Shutdown() delete(p.pools, pool.id) }