-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CAN Bus Manager #98
base: main
Are you sure you want to change the base?
CAN Bus Manager #98
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
// Package canlink provides utilities to interact with a | ||
// Controller Area Network (CAN Bus). | ||
package canlink | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"go.einride.tech/can/pkg/socketcan" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// Default buffered channel length for the broadcast and | ||
// incoming receive channels. | ||
// | ||
// In order to preserve the real time broadcast requirement, | ||
// BusManager will drop messages for any Handler that is | ||
// unable to keep up with the broadcast rate. | ||
const _channelBufferLength = 1000 | ||
|
||
// BusManager is a centralized node responsible for orchestrating | ||
// all interactions with a CAN bus. | ||
// | ||
// It acts as a message broker supporting the transmission | ||
// of bus traffic to registered handlers and receiving incoming | ||
// messages from these handlers to write to the bus. | ||
// | ||
// BusManager uses SocketCAN on the Linux platform. Note that | ||
// it does not manage the lifetime of the network socket connection. | ||
// | ||
// Example: | ||
// | ||
// package main | ||
// | ||
// import ( | ||
// "context" | ||
// "fmt" | ||
// "time" | ||
// | ||
// "go.einride.tech/can/pkg/socketcan" | ||
// "go.uber.org/zap" | ||
// ) | ||
// | ||
// func main () { | ||
// ctx := context.Background() | ||
// | ||
// loggerConfig := zap.NewDevelopmentConfig() | ||
// logger, err := loggerConfig.Build() | ||
// | ||
// // Create a network connection for vcan0 | ||
// conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") | ||
// if err != nil { | ||
// return | ||
// } | ||
// | ||
// manager := canlink.NewBusManager(logger, conn) | ||
// handler = NewHandler(...) | ||
// | ||
// broadcastChan, transmitChan := manager.Register(handler) | ||
// | ||
// handler.Handle(...) | ||
// | ||
// manager.Start(ctx) | ||
// | ||
// ... | ||
// | ||
// manager.Stop() | ||
// manager.Close() | ||
type BusManager struct { | ||
broadcastChan map[Handler]chan TimestampedFrame | ||
incomingChan chan TimestampedFrame | ||
|
||
receiver *socketcan.Receiver | ||
transmitter *socketcan.Transmitter | ||
|
||
l *zap.Logger | ||
stop chan struct{} | ||
isRunning bool | ||
mu sync.Mutex | ||
} | ||
|
||
// NewBusManager returns a BusManager object. | ||
// | ||
// The network connection is injected into the BusManager | ||
// and provides the interface for a single bus. | ||
// | ||
// See usage example. | ||
func NewBusManager(l *zap.Logger, conn *net.Conn) *BusManager { | ||
busManager := &BusManager{ | ||
l: l.Named("bus_manager"), | ||
|
||
broadcastChan: make(map[Handler]chan TimestampedFrame), | ||
incomingChan: make(chan TimestampedFrame, _channelBufferLength), | ||
|
||
receiver: socketcan.NewReceiver(*conn), | ||
transmitter: socketcan.NewTransmitter(*conn), | ||
} | ||
|
||
return busManager | ||
} | ||
|
||
// Register a Handler with the BusManager. | ||
// | ||
// Register returns two channels for the specified Handler. | ||
// The broadcast channel is a stream of traffic received from | ||
// the bus. The incoming channel enables the Handler to write | ||
// a frame to the bus. | ||
// | ||
// The channels operate on a TimestampedFrame object. | ||
func (b *BusManager) Register( | ||
handler Handler, | ||
) (chan TimestampedFrame, chan TimestampedFrame) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
b.l.Info("registering handler") | ||
|
||
subscription := make(chan TimestampedFrame, _channelBufferLength) | ||
b.broadcastChan[handler] = subscription | ||
|
||
b.l.Info("registered handler") | ||
|
||
return subscription, b.incomingChan | ||
} | ||
|
||
// Unregister a Handler from the BusManager. | ||
// | ||
// Deletes the broadcast channel that was previously | ||
// provided from BusManager. | ||
func (b *BusManager) Unregister(handler *Handler) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
delete(b.broadcastChan, *handler) | ||
} | ||
|
||
// Start the traffic broadcast and incoming frame listener | ||
// for each of the registered handlers. | ||
// | ||
// The broadcast stream will begin. If handlers cannot keep up | ||
// with the broadcast, frames for that handler will be dropped. | ||
// | ||
// The handlers can now send CAN frames to the BusManager to be | ||
// transmitted out onto the bus. | ||
func (b *BusManager) Start(ctx context.Context) { | ||
if b.isRunning { | ||
b.l.Warn("bus manager is already started") | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a warning log here, make the log line above it go after this logic |
||
} | ||
|
||
b.l.Info("start broadcast and process incoming") | ||
|
||
b.stop = make(chan struct{}) | ||
|
||
go b.broadcast(ctx) | ||
go b.processIncoming(ctx) | ||
|
||
b.isRunning = true | ||
} | ||
|
||
// Stop the traffic broadcast and incoming frame listener. | ||
// | ||
// Preserves registered handlers and their assosciated channels. | ||
func (b *BusManager) Stop() { | ||
if !b.isRunning { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing as start |
||
b.l.Warn("bus manager is already stopped") | ||
return | ||
} | ||
|
||
b.l.Info("stop broadcast and process incoming") | ||
|
||
close(b.stop) | ||
b.isRunning = false | ||
} | ||
|
||
// Close cleans up the bus network connection. | ||
func (b *BusManager) Close() error { | ||
if b.isRunning { | ||
b.l.Info("stopping bus manager") | ||
b.Stop() | ||
} | ||
|
||
b.l.Info("closing socketcan receiver and transmitter") | ||
|
||
b.receiver.Close() | ||
b.transmitter.Close() | ||
|
||
return nil | ||
} | ||
|
||
func (b *BusManager) broadcast(ctx context.Context) { | ||
for b.receiver.Receive() { | ||
timeFrame := TimestampedFrame{b.receiver.Frame(), time.Now()} | ||
|
||
b.mu.Lock() | ||
|
||
for handler, ch := range b.broadcastChan { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where does the message get passed to the handler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It never gets passed to the handler, rather it gets sent into its associated broadcast channel (see line 213), the handler in this loop is only used in logging it seems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @langei How's this work and where do I define my message handling behaviour? I thought the purpose of the BusManager was to send the message to each Handler. |
||
select { | ||
case <-ctx.Done(): | ||
b.l.Info("context deadline exceeded") | ||
return | ||
case _, ok := <-ch: | ||
if !ok { | ||
b.l.Info("broadcast channel closed, exiting broadcast routine") | ||
return | ||
} | ||
case <-b.stop: | ||
b.l.Info("stop signal received") | ||
return | ||
case ch <- timeFrame: | ||
b.l.Info("broadcasted can frame") | ||
default: | ||
b.l.Warn("dropping frames on handler", zap.String("handler", handler.Name())) | ||
} | ||
} | ||
|
||
b.mu.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good question. I think you might be right as the scope exits right after the unlock which would make them functionally equal. Here is a way to test this: defer func() {
b.l.Debug("Unlocking mutex")
b.mu.Unlock()
} |
||
} | ||
} | ||
|
||
func (b *BusManager) processIncoming(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
b.l.Info("context deadline exceeded") | ||
return | ||
case <-b.stop: | ||
b.l.Info("stop signal received") | ||
return | ||
case frame, ok := <-b.incomingChan: | ||
if !ok { | ||
b.l.Info("incoming channel closed, exiting process routine") | ||
return | ||
} | ||
|
||
b.transmitter.TransmitFrame(ctx, frame.Frame) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package canlink | ||
|
||
type Handler interface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't fully understand how this will be used. Could you document this a bit more and just maybe explain the reasoning for this design. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handler doesn't necessarily need to exist. It's only use right now is to provide as an argument to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we were using Handlers as our interface for "handling" incoming messages, which would make Handler a key part of this architecture. |
||
Name() string | ||
BlakeFreer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Handle(chan TimestampedFrame, chan TimestampedFrame) | ||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Binary alert! |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.einride.tech/can/pkg/socketcan" | ||
"go.uber.org/zap" | ||
|
||
"github.com/macformula/hil/canlink" | ||
) | ||
|
||
type Handler struct { | ||
name string | ||
} | ||
|
||
func NewHandler() *Handler { | ||
handler := &Handler{ | ||
name: "testHandler", | ||
} | ||
|
||
return handler | ||
} | ||
|
||
func (h *Handler) Handle( | ||
broadcast chan canlink.TimestampedFrame, | ||
transmit chan canlink.TimestampedFrame, | ||
) { | ||
go func() { | ||
for { | ||
select { | ||
case frame := <-broadcast: | ||
fmt.Println("RECEIVED: ", frame.Frame) | ||
default: | ||
} | ||
} | ||
}() | ||
|
||
go func() { | ||
var i byte | ||
|
||
for { | ||
time.Sleep(2 * time.Millisecond) | ||
|
||
frame := canlink.TimestampedFrame{} | ||
copy(frame.Frame.Data[:], []byte{i}) | ||
frame.Time = time.Now() | ||
|
||
i = i + 1 | ||
|
||
transmit <- frame | ||
} | ||
}() | ||
} | ||
|
||
func (h *Handler) Name() string { | ||
return "Handler" | ||
} | ||
|
||
func main() { | ||
ctx := context.Background() | ||
|
||
loggerConfig := zap.NewDevelopmentConfig() | ||
logger, err := loggerConfig.Build() | ||
|
||
conn, err := socketcan.DialContext(context.Background(), "can", "vcan0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it make sense to include this code into a BusManager function? I think it would be nice to be able to call something like
Same with the logger. If every BusManager needs a logger, can it create its own during construction? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There may be other objects that interact with can and this way they can all share the same socket connection. What we should do is have a look at the dialcontext implementation. If this creates a new connection on each call then it should remain the same. But if it just returns an instance of a pre-existing connection it should be fine to make the change. This is how zap logger works as well. You create a logger instance at the top level and pass it in to all objects that will perform logging. I understand this is a little different from how python handles things. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, logger should be injected. However for the connection, it might make more sense for the |
||
if err != nil { | ||
logger.Error("failed to create socket can connection", | ||
zap.String("can_interface", "vcan0"), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
manager := canlink.NewBusManager(logger, &conn) | ||
handler := NewHandler() | ||
|
||
broadcast, transmit := manager.Register(handler) | ||
|
||
handler.Handle(broadcast, transmit) | ||
|
||
manager.Start(ctx) | ||
|
||
for { | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily. I added |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider
Register
or creating the BusManager by injecting handlers into it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this