-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
348 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
// Package canlink provides utilities to interact with a | ||
// Controller Area Network (CAN Bus). | ||
package canlink | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"go.einride.tech/can/pkg/socketcan" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// Default buffered channel length for the broadcast and | ||
// incoming receive channels. | ||
// | ||
// In order to preserve the real time broadcast requirement, | ||
// BusManager will drop messages for any Handler that is | ||
// unable to keep up with the broadcast rate. | ||
const _channelBufferLength = 1000 | ||
|
||
// BusManager is a centralized node responsible for orchestrating | ||
// all interactions with a CAN bus. | ||
// | ||
// It acts as a message broker supporting the transmission | ||
// of bus traffic to registered handlers and receiving incoming | ||
// messages from these handlers to write to the bus. | ||
// | ||
// BusManager uses SocketCAN on the Linux platform. Note that | ||
// it does not manage the lifetime of the network socket connection. | ||
// | ||
// Example: | ||
// | ||
// package main | ||
// | ||
// import ( | ||
// "context" | ||
// "fmt" | ||
// "time" | ||
// | ||
// "go.einride.tech/can/pkg/socketcan" | ||
// "go.uber.org/zap" | ||
// ) | ||
// | ||
// func main () { | ||
// ctx := context.Background() | ||
// | ||
// loggerConfig := zap.NewDevelopmentConfig() | ||
// logger, err := loggerConfig.Build() | ||
// | ||
// // Create a network connection for vcan0 | ||
// conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") | ||
// if err != nil { | ||
// return | ||
// } | ||
// | ||
// manager := canlink.NewBusManager(logger, conn) | ||
// handler = NewHandler(...) | ||
// | ||
// broadcast, transmit := manager.Register(handler) | ||
// | ||
// handler.Handle(...) | ||
// | ||
// manager.Start(ctx) | ||
// | ||
// ... | ||
// | ||
// manager.Stop() | ||
// manager.Close() | ||
type BusManager struct { | ||
broadcastChan map[Handler]chan TimestampedFrame | ||
incomingChan chan TimestampedFrame | ||
|
||
receiver *socketcan.Receiver | ||
transmitter *socketcan.Transmitter | ||
|
||
l *zap.Logger | ||
stop chan struct{} | ||
isRunning bool | ||
mu sync.Mutex | ||
} | ||
|
||
// NewBusManager returns a BusManager object. | ||
// | ||
// The network connection is injected into the BusManager | ||
// and provides the interface for a single bus. | ||
// | ||
// See usage example. | ||
func NewBusManager(l *zap.Logger, conn *net.Conn) *BusManager { | ||
busManager := &BusManager{ | ||
l: l.Named("Bus Manager"), | ||
|
||
broadcastChan: make(map[Handler]chan TimestampedFrame), | ||
incomingChan: make(chan TimestampedFrame, _channelBufferLength), | ||
|
||
receiver: socketcan.NewReceiver(*conn), | ||
transmitter: socketcan.NewTransmitter(*conn), | ||
} | ||
|
||
return busManager | ||
} | ||
|
||
// Register a Handler with the BusManager. | ||
// | ||
// Register returns two channels for the specified Handler. | ||
// The broadcast channel is a stream of traffic received from | ||
// the bus. The incoming channel enables the Handler to write | ||
// a frame to the bus. | ||
// | ||
// The channels operate on a TimestampedFrame object. | ||
func (b *BusManager) Register( | ||
handler Handler, | ||
) (chan TimestampedFrame, chan TimestampedFrame) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
b.l.Info("registering handler") | ||
|
||
subscription := make(chan TimestampedFrame, _channelBufferLength) | ||
b.broadcastChan[handler] = subscription | ||
|
||
b.l.Info("registered handler") | ||
|
||
return subscription, b.incomingChan | ||
} | ||
|
||
// Unregister a Handler from the BusManager. | ||
// | ||
// Deletes the broadcast channel that was previously | ||
// provided from BusManager. | ||
func (b *BusManager) Unregister(handler *Handler) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
delete(b.broadcastChan, *handler) | ||
} | ||
|
||
// Start the traffic broadcast and incoming frame listener | ||
// for each of the registered handlers. | ||
// | ||
// The broadcast stream will begin. If handlers cannot keep up | ||
// with the broadcast, frames for that handler will be dropped. | ||
// | ||
// The handlers can now send CAN frames to the BusManager to be | ||
// transmitted out onto the bus. | ||
func (b *BusManager) Start(ctx context.Context) { | ||
b.l.Info("start broadcast and incoming") | ||
|
||
if b.isRunning { | ||
return | ||
} | ||
|
||
b.stop = make(chan struct{}) | ||
|
||
go b.broadcast(ctx) | ||
go b.processIncoming(ctx) | ||
|
||
b.isRunning = true | ||
} | ||
|
||
// Stop the traffic broadcast and incoming frame listener. | ||
// | ||
// Preserves registered handlers and their assosciated channels. | ||
func (b *BusManager) Stop() { | ||
b.l.Info("stop broadcast and incoming") | ||
|
||
if !b.isRunning { | ||
return | ||
} | ||
|
||
close(b.stop) | ||
b.isRunning = false | ||
} | ||
|
||
// Close cleans up the bus network connection. | ||
func (b *BusManager) Close(conn net.Conn) error { | ||
b.l.Info("closing socketcan receiver and transmitter") | ||
|
||
if b.isRunning { | ||
return errors.New("cannot close active bus manager") | ||
} | ||
|
||
b.receiver.Close() | ||
b.transmitter.Close() | ||
|
||
return nil | ||
} | ||
|
||
func (b *BusManager) broadcast(ctx context.Context) { | ||
timeFrame := TimestampedFrame{} | ||
|
||
for b.receiver.Receive() { | ||
timeFrame.Frame = b.receiver.Frame() | ||
timeFrame.Time = time.Now() | ||
|
||
b.mu.Lock() | ||
|
||
for handler, ch := range b.broadcastChan { | ||
select { | ||
case <-ctx.Done(): | ||
b.l.Info("context deadline exceeded") | ||
return | ||
case _, ok := <-ch: | ||
if !ok { | ||
b.l.Info("broadcast channel closed, exiting broadcast routine") | ||
return | ||
} | ||
case <-b.stop: | ||
b.l.Info("stop signal received") | ||
return | ||
case ch <- timeFrame: | ||
b.l.Info("broadcasted can frame") | ||
default: | ||
b.l.Warn("dropping frames on handler", zap.String("handler", handler.Name())) | ||
} | ||
} | ||
|
||
b.mu.Unlock() | ||
} | ||
} | ||
|
||
func (b *BusManager) processIncoming(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
b.l.Info("context deadline exceeded") | ||
return | ||
case <-b.stop: | ||
b.l.Info("stop signal received") | ||
return | ||
case frame, ok := <-b.incomingChan: | ||
if !ok { | ||
b.l.Info("incoming channel closed, exiting process routine") | ||
return | ||
} | ||
|
||
b.transmitter.TransmitFrame(ctx, frame.Frame) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package canlink | ||
|
||
type Handler interface { | ||
Name() string | ||
Handle(chan TimestampedFrame, chan TimestampedFrame) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.einride.tech/can/pkg/socketcan" | ||
"go.uber.org/zap" | ||
|
||
"github.com/macformula/hil/canlink" | ||
) | ||
|
||
type Handler struct { | ||
name string | ||
} | ||
|
||
func NewHandler() *Handler { | ||
handler := &Handler{ | ||
name: "testHandler", | ||
} | ||
|
||
return handler | ||
} | ||
|
||
func (h *Handler) Handle( | ||
broadcast chan canlink.TimestampedFrame, | ||
transmit chan canlink.TimestampedFrame, | ||
) { | ||
go func() { | ||
for { | ||
select { | ||
case frame := <-broadcast: | ||
fmt.Println("RECEIVED: ", frame.Frame) | ||
default: | ||
} | ||
} | ||
}() | ||
|
||
go func() { | ||
var i byte | ||
|
||
for { | ||
time.Sleep(2 * time.Millisecond) | ||
|
||
frame := canlink.TimestampedFrame{} | ||
copy(frame.Frame.Data[:], []byte{i}) | ||
frame.Time = time.Now() | ||
|
||
i = i + 1 | ||
|
||
transmit <- frame | ||
} | ||
}() | ||
} | ||
|
||
func (h *Handler) Name() string { | ||
return "Handler" | ||
} | ||
|
||
func main() { | ||
ctx := context.Background() | ||
|
||
loggerConfig := zap.NewDevelopmentConfig() | ||
logger, err := loggerConfig.Build() | ||
|
||
conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") | ||
if err != nil { | ||
logger.Error("failed to create socket can connection", | ||
zap.String("can_interface", "vcan0"), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
manager := canlink.NewBusManager(logger, &conn) | ||
handler := NewHandler() | ||
|
||
broadcast, transmit := manager.Register(handler) | ||
|
||
handler.Handle(broadcast, transmit) | ||
|
||
manager.Start(ctx) | ||
|
||
for { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters