Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAN Bus Manager #98

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions canlink/busmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Package canlink provides utilities to interact with a
// Controller Area Network (CAN Bus).
package canlink

import (
"context"
"errors"

Check failure on line 7 in canlink/busmanager.go

View workflow job for this annotation

GitHub Actions / build

"errors" imported and not used
"net"
"sync"
"time"

"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"
)

// Default buffered channel length for the broadcast and
// incoming receive channels.
//
// In order to preserve the real time broadcast requirement,
// BusManager will drop messages for any Handler that is
// unable to keep up with the broadcast rate.
const _channelBufferLength = 1000

// BusManager is a centralized node responsible for orchestrating
// all interactions with a CAN bus.
//
// It acts as a message broker supporting the transmission
// of bus traffic to registered handlers and receiving incoming
// messages from these handlers to write to the bus.
//
// BusManager uses SocketCAN on the Linux platform. Note that
// it does not manage the lifetime of the network socket connection.
//
// Example:
//
// package main
//
// import (
// "context"
// "fmt"
// "time"
//
// "go.einride.tech/can/pkg/socketcan"
// "go.uber.org/zap"
// )
//
// func main () {
// ctx := context.Background()
//
// loggerConfig := zap.NewDevelopmentConfig()
// logger, err := loggerConfig.Build()
//
// // Create a network connection for vcan0
// conn, err := socketcan.DialContext(context.Background(), "can", "vcan0")
// if err != nil {
// return
// }
//
// manager := canlink.NewBusManager(logger, conn)
// handler = NewHandler(...)
//
// broadcastChan, transmitChan := manager.Register(handler)
//
// handler.Handle(...)
//
// manager.Start(ctx)
//
// ...
//
// manager.Stop()
// manager.Close()
type BusManager struct {
broadcastChan map[Handler]chan TimestampedFrame
incomingChan chan TimestampedFrame

receiver *socketcan.Receiver
transmitter *socketcan.Transmitter

l *zap.Logger
stop chan struct{}
isRunning bool
mu sync.Mutex
}

// NewBusManager returns a BusManager object.
//
// The network connection is injected into the BusManager
// and provides the interface for a single bus.
//
// See usage example.
func NewBusManager(l *zap.Logger, conn *net.Conn) *BusManager {
busManager := &BusManager{
l: l.Named("bus_manager"),

broadcastChan: make(map[Handler]chan TimestampedFrame),
incomingChan: make(chan TimestampedFrame, _channelBufferLength),

receiver: socketcan.NewReceiver(*conn),
transmitter: socketcan.NewTransmitter(*conn),
}

return busManager
}

// Register a Handler with the BusManager.
//
// Register returns two channels for the specified Handler.
// The broadcast channel is a stream of traffic received from
// the bus. The incoming channel enables the Handler to write
// a frame to the bus.
//
// The channels operate on a TimestampedFrame object.
func (b *BusManager) Register(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider Register or creating the BusManager by injecting handlers into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this

handler Handler,
) (chan TimestampedFrame, chan TimestampedFrame) {
b.mu.Lock()
defer b.mu.Unlock()

b.l.Info("registering handler")

subscription := make(chan TimestampedFrame, _channelBufferLength)
b.broadcastChan[handler] = subscription

b.l.Info("registered handler")

return subscription, b.incomingChan
}

// Unregister a Handler from the BusManager.
//
// Deletes the broadcast channel that was previously
// provided from BusManager.
func (b *BusManager) Unregister(handler *Handler) {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.broadcastChan, *handler)
}

// Start the traffic broadcast and incoming frame listener
// for each of the registered handlers.
//
// The broadcast stream will begin. If handlers cannot keep up
// with the broadcast, frames for that handler will be dropped.
//
// The handlers can now send CAN frames to the BusManager to be
// transmitted out onto the bus.
func (b *BusManager) Start(ctx context.Context) {
if b.isRunning {
b.l.Warn("bus manager is already started")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a warning log here, make the log line above it go after this logic

}

b.l.Info("start broadcast and process incoming")

b.stop = make(chan struct{})

go b.broadcast(ctx)
go b.processIncoming(ctx)

b.isRunning = true
}

// Stop the traffic broadcast and incoming frame listener.
//
// Preserves registered handlers and their assosciated channels.
func (b *BusManager) Stop() {
if !b.isRunning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing as start

b.l.Warn("bus manager is already stopped")
return
}

b.l.Info("stop broadcast and process incoming")

close(b.stop)
b.isRunning = false
}

// Close cleans up the bus network connection.
func (b *BusManager) Close() error {
if b.isRunning {
b.l.Info("stopping bus manager")
b.Stop()
}

b.l.Info("closing socketcan receiver and transmitter")

b.receiver.Close()
b.transmitter.Close()

return nil
}

func (b *BusManager) broadcast(ctx context.Context) {
for b.receiver.Receive() {
timeFrame := TimestampedFrame{b.receiver.Frame(), time.Now()}

b.mu.Lock()

for handler, ch := range b.broadcastChan {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does the message get passed to the handler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never gets passed to the handler, rather it gets sent into its associated broadcast channel (see line 213), the handler in this loop is only used in logging it seems.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@langei How's this work and where do I define my message handling behaviour? I thought the purpose of the BusManager was to send the message to each Handler.

select {
case <-ctx.Done():
b.l.Info("context deadline exceeded")
return
case _, ok := <-ch:
if !ok {
b.l.Info("broadcast channel closed, exiting broadcast routine")
return
}
case <-b.stop:
b.l.Info("stop signal received")
return
case ch <- timeFrame:
b.l.Info("broadcasted can frame")
default:
b.l.Warn("dropping frames on handler", zap.String("handler", handler.Name()))
}
}

b.mu.Unlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be defered from earlier? idk enough Go to know which way is best

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. I think you might be right as the scope exits right after the unlock which would make them functionally equal. Here is a way to test this:

defer func() {
    b.l.Debug("Unlocking mutex")
    b.mu.Unlock()
}

}
}

func (b *BusManager) processIncoming(ctx context.Context) {
for {
select {
case <-ctx.Done():
b.l.Info("context deadline exceeded")
return
case <-b.stop:
b.l.Info("stop signal received")
return
case frame, ok := <-b.incomingChan:
if !ok {
b.l.Info("incoming channel closed, exiting process routine")
return
}

b.transmitter.TransmitFrame(ctx, frame.Frame)
}
}
}
6 changes: 6 additions & 0 deletions canlink/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package canlink

type Handler interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how this will be used. Could you document this a bit more and just maybe explain the reasoning for this design. The busmanager piece totally makes sense to me, I just can't picture how the handlers will be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handler doesn't necessarily need to exist. It's only use right now is to provide as an argument to Register. The idea is that objects like Tracer and CanClient would be handlers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were using Handlers as our interface for "handling" incoming messages, which would make Handler a key part of this architecture.

Name() string
BlakeFreer marked this conversation as resolved.
Show resolved Hide resolved
Handle(chan TimestampedFrame, chan TimestampedFrame)
}
2 changes: 1 addition & 1 deletion canlink/timestampedframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"go.einride.tech/can"
)

// TimestampedFrame contains a single CAN frame along with the time it was received
// TimestampedFrame contains a single CAN frame along with the time it was received.
type TimestampedFrame struct {
Frame can.Frame
Time time.Time
Expand Down
Binary file added cmd/busmanager/busmanager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary alert!

Binary file not shown.
87 changes: 87 additions & 0 deletions cmd/busmanager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"fmt"
"time"

"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"

"github.com/macformula/hil/canlink"
)

type Handler struct {
name string
}

func NewHandler() *Handler {
handler := &Handler{
name: "testHandler",
}

return handler
}

func (h *Handler) Handle(
broadcast chan canlink.TimestampedFrame,
transmit chan canlink.TimestampedFrame,
) {
go func() {
for {
select {
case frame := <-broadcast:
fmt.Println("RECEIVED: ", frame.Frame)
default:
}
}
}()

go func() {
var i byte

for {
time.Sleep(2 * time.Millisecond)

frame := canlink.TimestampedFrame{}
copy(frame.Frame.Data[:], []byte{i})
frame.Time = time.Now()

i = i + 1

transmit <- frame
}
}()
}

func (h *Handler) Name() string {
return "Handler"
}

func main() {
ctx := context.Background()

loggerConfig := zap.NewDevelopmentConfig()
logger, err := loggerConfig.Build()

conn, err := socketcan.DialContext(context.Background(), "can", "vcan0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to include this code into a BusManager function? I think it would be nice to be able to call something like

manager =canlink.BusManager("vcan0")

Same with the logger. If every BusManager needs a logger, can it create its own during construction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be other objects that interact with can and this way they can all share the same socket connection. What we should do is have a look at the dialcontext implementation. If this creates a new connection on each call then it should remain the same. But if it just returns an instance of a pre-existing connection it should be fine to make the change.

This is how zap logger works as well. You create a logger instance at the top level and pass it in to all objects that will perform logging. I understand this is a little different from how python handles things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, logger should be injected. However for the connection, it might make more sense for the BusManager to own it. I'll look into DialContext.

if err != nil {
logger.Error("failed to create socket can connection",
zap.String("can_interface", "vcan0"),
zap.Error(err),
)
return
}

manager := canlink.NewBusManager(logger, &conn)
handler := NewHandler()

broadcast, transmit := manager.Register(handler)

handler.Handle(broadcast, transmit)

manager.Start(ctx)

for {
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to .Close the busmanager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. I added Close earlier when I had an Open function, but chose to take that out. Thinking I can do the same with Close.

}
15 changes: 12 additions & 3 deletions cmd/tracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"os/signal"
"time"

"github.com/pkg/errors"
"go.einride.tech/can/pkg/socketcan"
"go.uber.org/zap"

"github.com/macformula/hil/canlink"
"github.com/macformula/hil/macformula/cangen/vehcan"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -144,7 +144,12 @@ func waitForSigTerm(stop chan struct{}, logger *zap.Logger) {
}

func startSendMessageRoutine(
ctx context.Context, stop chan struct{}, msgPeriod time.Duration, cc *canlink.CanClient, l *zap.Logger) {
ctx context.Context,
stop chan struct{},
msgPeriod time.Duration,
cc *canlink.CanClient,
l *zap.Logger,
) {
packState := vehcan.NewPack_State()
packState.SetPopulated_Cells(_numCells)
packState.SetPack_Current(0)
Expand Down Expand Up @@ -179,7 +184,11 @@ func startSendMessageRoutine(
// +packCurrentDeviation on even i, -packCurrentDeviation on odd i
packCurrentDeviation := _packCurrentDeviation * float64(i%2+1) * (-1)
packCurrentIncr := float64(msgPeriod/time.Second) * _packCurrentIncrPerSec
packCurrent := clamp(packState.Pack_Current()+packCurrentIncr, _minPackCurrent, _maxPackCurrent)
packCurrent := clamp(
packState.Pack_Current()+packCurrentIncr,
_minPackCurrent,
_maxPackCurrent,
)
packCurrent += packCurrentDeviation
packState.SetPack_Current(packCurrent)
} else {
Expand Down
Loading