diff --git a/dbc2val/dbcfeeder.py b/dbc2val/dbcfeeder.py index ca9911fc..38b6ea02 100755 --- a/dbc2val/dbcfeeder.py +++ b/dbc2val/dbcfeeder.py @@ -35,10 +35,10 @@ import asyncio from signal import SIGINT, SIGTERM, signal -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional -from dbcfeederlib import canclient -from dbcfeederlib import canplayer +from dbcfeederlib.canclient import CANClient +from dbcfeederlib.canreader import CanReader from dbcfeederlib import dbc2vssmapper from dbcfeederlib import dbcreader from dbcfeederlib import j1939reader @@ -125,31 +125,30 @@ class Feeder: Start listening to the queue and transform CAN messages to VSS data and if conditions are fulfilled send them to the client wrapper which in turn send it to the bckend supported by the wrapper. """ - def __init__(self, client_wrapper: clientwrapper.ClientWrapper, + def __init__(self, kuksa_client: clientwrapper.ClientWrapper, elmcan_config: Dict[str, Any], dbc2val: bool = True, val2dbc: bool = False): - self._running = False - self._reader = None - self._player: Optional[canplayer.CANplayer] = None - self._mapper = None - self._registered = False - self._can_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() - self._client_wrapper = client_wrapper + self._running: bool = False + self._reader: Optional[CanReader] = None + self._mapper: Optional[dbc2vssmapper.Mapper] = None + self._registered: bool = False + self._dbc2vss_queue: queue.Queue[dbc2vssmapper.VSSObservation] = queue.Queue() + self._kuksa_client = kuksa_client self._elmcan_config = elmcan_config self._disconnect_time = 0.0 - self._dbc2val = dbc2val - self._val2dbc = val2dbc - self._canclient = None - self._transmit = False + self._dbc2val_enabled = dbc2val + self._val2dbc_enabled = val2dbc + self._canclient: Optional[CANClient] = None + self._transmit: bool = False def start( self, - canport, - dbc_file_names, - mappingfile, - dbc_default_file, - candumpfile, - use_j1939=False, - use_strict_parsing=False + canport: str, + dbc_file_names: List[str], + mappingfile: str, + dbc_default_file: Optional[str], + candumpfile: Optional[str], + use_j1939: bool = False, + use_strict_parsing: bool = False ): self._running = True @@ -160,42 +159,27 @@ def start( expect_extended_frame_ids=use_j1939, can_signal_default_values_file=dbc_default_file) - self._client_wrapper.start() + self._kuksa_client.start() threads = [] - if self._dbc2val and self._mapper.has_dbc2vss_mapping(): + if self._dbc2val_enabled and self._mapper.has_dbc2vss_mapping(): + log.info("Setting up reception of CAN signals") if use_j1939: log.info("Using J1939 reader") - self._reader = j1939reader.J1939Reader(self._can_queue, self._mapper) + self._reader = j1939reader.J1939Reader(self._dbc2vss_queue, self._mapper, canport, candumpfile) else: log.info("Using DBC reader") - self._reader = dbcreader.DBCReader(self._can_queue, self._mapper) - - if candumpfile: - # use dumpfile - log.info( - "Using virtual bus to replay CAN messages (channel: %s) (dumpfile: %s)", - canport, - candumpfile - ) - self._reader.start_listening( - bustype="virtual", - channel=canport, - bitrate=500000 - ) - self._player = canplayer.CANplayer(dumpfile=candumpfile) - self._player.start_replaying(canport=canport) - else: - - if canport == 'elmcan': + self._reader = dbcreader.DBCReader(self._dbc2vss_queue, self._mapper, canport, candumpfile) - log.info("Using elmcan. Trying to set up elm2can bridge") - elm2canbridge.elm2canbridge(canport, self._elmcan_config, self._reader._canidwl) + if canport == 'elmcan': + log.info("Using elmcan. Trying to set up elm2can bridge") + whitelisted_frame_ids: List[int] = [] + for filter in self._mapper.can_frame_id_whitelist(): + whitelisted_frame_ids.append(filter.can_id) # type: ignore + elm2canbridge.elm2canbridge(canport, self._elmcan_config, whitelisted_frame_ids) - # use socketCAN - log.info("Using socket CAN device '%s'", canport) - self._reader.start_listening(bustype="socketcan", channel=canport) + self._reader.start() receiver = threading.Thread(target=self._run_receiver) receiver.start() @@ -203,8 +187,8 @@ def start( else: log.info("No dbc2val mappings found or dbc2val disabled!") - if self._val2dbc and self._mapper.has_vss2dbc_mapping(): - if not self._client_wrapper.supports_subscription(): + if self._val2dbc_enabled and self._mapper.has_vss2dbc_mapping(): + if not self._kuksa_client.supports_subscription(): log.error("Subscribing to VSS signals not supported by chosen client!") self.stop() else: @@ -212,7 +196,7 @@ def start( # For now creating another bus # Maybe support different buses for downstream/upstream in the future - self._canclient = canclient.CANClient(bustype="socketcan", channel=canport) + self._canclient = CANClient(interface="socketcan", channel=canport) transmitter = threading.Thread(target=self._run_transmitter) transmitter.start() @@ -229,16 +213,12 @@ def stop(self): # Tell others to stop if self._reader is not None: self._reader.stop() - if self._player is not None: - self._player.stop() - self._client_wrapper.stop() + self._kuksa_client.stop() if self._canclient: self._canclient.stop() - self._canclient = None - self._mapper = None self._transmit = False - def is_running(self): + def is_running(self) -> bool: return self._running def _register_datapoints(self) -> bool: @@ -254,7 +234,7 @@ def _register_datapoints(self) -> bool: all_registered = True for vss_name in self._mapper.get_vss_names(): log.debug("Checking if signal %s is registered", vss_name) - resp = self._client_wrapper.is_signal_defined(vss_name) + resp = self._kuksa_client.is_signal_defined(vss_name) if not resp: all_registered = False return all_registered @@ -265,7 +245,7 @@ def _run_receiver(self): last_sent_log_entry = 0 queue_max_size = 0 while self._running is True: - if self._client_wrapper.is_connected(): + if self._kuksa_client.is_connected(): self._disconnect_time = 0.0 else: # As we actually cannot register @@ -287,36 +267,40 @@ def _run_receiver(self): if not processing_started: processing_started = True log.info("Starting to process CAN signals") - queue_size = self._can_queue.qsize() + queue_size = self._dbc2vss_queue.qsize() if queue_size > queue_max_size: queue_max_size = queue_size - vss_observation = self._can_queue.get(timeout=1) - vss_mapping = self._mapper.get_dbc2vss_mapping(vss_observation.dbc_name, vss_observation.vss_name) + vss_observation = self._dbc2vss_queue.get(timeout=1) + vss_mapping = self._mapper.get_dbc2val_mapping(vss_observation.dbc_name, vss_observation.vss_name) value = vss_mapping.transform_value(vss_observation.raw_value) if value is None: - log.warning(f"Value ignored for dbc {vss_observation.dbc_name} to VSS {vss_observation.vss_name}," - f" from raw value {value} of type {type(value)}") + log.warning( + "Value ignored for dbc %s to VSS %s, from raw value %s of type %s", + vss_observation.dbc_name, vss_observation.vss_name, value, type(value) + ) elif not vss_mapping.change_condition_fulfilled(value): - log.debug(f"Value condition not fulfilled for VSS {vss_observation.vss_name}, value {value}") + log.debug("Value condition not fulfilled for VSS %s, value %s", vss_observation.vss_name, value) else: - # get values out of the canreplay and map to desired signals + # update current value in KUKSA.val target = vss_observation.vss_name - success = self._client_wrapper.update_datapoint(target, value) + success = self._kuksa_client.update_datapoint(target, value) if success: log.debug("Succeeded sending DataPoint(%s, %s, %f)", target, value, vss_observation.time) # Give status message after 1, 2, 4, 8, 16, 32, 64, .... messages have been sent messages_sent += 1 if messages_sent >= (2 * last_sent_log_entry): - log.info(f"Number of VSS messages sent so far: {messages_sent}, " - f"queue max size: {queue_max_size}") + log.info( + "Number of VSS messages sent so far: %d, queue max size: %d", + messages_sent, queue_max_size + ) last_sent_log_entry = messages_sent except queue.Empty: pass except Exception: log.error("Exception caught in main loop", exc_info=True) - async def vss_update(self, updates): + async def _vss_update(self, updates): log.debug("vss-Update callback!") dbc_ids = set() for update in updates: @@ -353,7 +337,7 @@ async def _run_subscribe(self): Requests the client wrapper to start subscription. Checks every second if we have requested to stop reception and if so exits """ - asyncio.create_task(self._client_wrapper.subscribe(self._mapper.get_vss2dbc_entries(), self.vss_update)) + asyncio.create_task(self._kuksa_client.subscribe(self._mapper.get_vss2dbc_entries(), self._vss_update)) while self._transmit: await asyncio.sleep(1) diff --git a/dbc2val/dbcfeederlib/canclient.py b/dbc2val/dbcfeederlib/canclient.py index 5bfce3c8..d895c378 100644 --- a/dbc2val/dbcfeederlib/canclient.py +++ b/dbc2val/dbcfeederlib/canclient.py @@ -23,24 +23,24 @@ class CANClient: """ Wrapper class to hide dependency to CAN package. Reason is to make it simple to replace the CAN package dependency with something else if your KUKSA.val - integration cannot interact directly with CAN, but rather interacts with some custom CAN solution/middleware + integration cannot interact directly with CAN, but rather interacts with some custom CAN solution/middleware. """ def __init__(self, *args, **kwargs): - self.bus = can.interface.Bus(*args, **kwargs) # pylint: disable=abstract-class-instantiated + # pylint: disable=abstract-class-instantiated + self._bus = can.interface.Bus(*args, **kwargs) def stop(self): - if self.bus: - self.bus.shutdown() - self.bus = None + """Shut down CAN bus.""" + self._bus.shutdown() def recv(self, timeout: int = 1) -> Optional[canmessage.CANMessage]: - """Wait for message from CAN""" + """Receive message from CAN bus.""" try: - msg = self.bus.recv(timeout) + msg = self._bus.recv(timeout) except can.CanError: msg = None - if self.bus: + if self._bus: log.error("Error while waiting for recv from CAN", exc_info=True) else: # This is expected if we are shutting down @@ -52,11 +52,11 @@ def recv(self, timeout: int = 1) -> Optional[canmessage.CANMessage]: return None def send(self, arbitration_id, data): - """Send message to CAN""" + """Write message to CAN bus.""" msg = can.Message(arbitration_id=arbitration_id, data=data) try: - self.bus.send(msg) - log.debug(f"Message sent on {self.bus.channel_info}") - log.debug(f"Message: {msg}") + self._bus.send(msg) + if log.isEnabledFor(logging.DEBUG): + log.debug("Sent message [channel: %s]: %s", self._bus.channel_info, msg) except can.CanError: log.error("Failed to send message via CAN bus") diff --git a/dbc2val/dbcfeederlib/canplayer.py b/dbc2val/dbcfeederlib/canplayer.py index 9212939e..de0ca795 100644 --- a/dbc2val/dbcfeederlib/canplayer.py +++ b/dbc2val/dbcfeederlib/canplayer.py @@ -15,7 +15,7 @@ import threading import can # type: ignore -from can.interfaces.virtual import VirtualBus # type: ignore +from can.interfaces.virtual import VirtualBus log = logging.getLogger(__name__) @@ -36,46 +36,46 @@ class CANplayer: files suffix is one of the above (e.g. filename.asc.gz). """ - def __init__(self, dumpfile): - self.run = False - self.messages = [can.message] - self.dumpfile = dumpfile - - def process_log(self): + def __init__(self, dumpfile: str, can_port: str): + self._running = False # open the file for reading can messages - log.info("Replaying CAN message log {}".format(self.dumpfile)) + log.info("Replaying CAN messages from log file %s", dumpfile) + self._messages = can.LogReader(dumpfile) + self._can_port = can_port + log.debug("Using virtual bus to replay CAN messages (channel: %s)", self._can_port) + self._bus = VirtualBus(channel=can_port, bitrate=500000) + + def _process_log(self): # using MessageSync in order to consider timestamps of CAN messages # and the delays between them - log_reader = can.MessageSync(messages=can.LogReader(self.dumpfile), timestamps=True) + log_reader = can.MessageSync(messages=self._messages, timestamps=True) for msg in log_reader: - if not self.run: - break + if not self._running: + return try: - self.bus.send(msg) - log.debug(f"Message sent on {self.bus.channel_info}") - log.debug(f"Message: {msg}") + self._bus.send(msg) + if log.isEnabledFor(logging.DEBUG): + log.debug("Sent message [channel: %s]: %s", self._bus.channel_info, msg) except can.CanError: log.error("Failed to send message via CAN bus") log.info("Replayed all messages from CAN log file") - def txWorker(self): - log.info("Starting Tx thread") + def _tx_worker(self): + log.info("Starting to write CAN messages to bus") - while self.run: - self.process_log() + while self._running: + self._process_log() - log.info("Stopped Tx thread") + log.info("Stopped writing CAN messages to bus") - def start_replaying(self, canport): - log.debug("Using virtual bus to replay CAN messages (channel: %s)", canport) - self.bus = VirtualBus(channel=canport, bitrate=500000) - self.run = True - txThread = threading.Thread(target=self.txWorker) - txThread.start() + def start(self): + self._running = True + tx_thread = threading.Thread(target=self._tx_worker) + tx_thread.start() def stop(self): - self.run = False - if self.bus: - self.bus.shutdown() - self.bus = None + self._running = False + if self._bus: + self._bus.shutdown() + self._bus = None diff --git a/dbc2val/dbcfeederlib/canreader.py b/dbc2val/dbcfeederlib/canreader.py new file mode 100644 index 00000000..8a0121b8 --- /dev/null +++ b/dbc2val/dbcfeederlib/canreader.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +################################################################################# +# Copyright (c) 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License 2.0 which is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +import logging +import time + +from abc import ABC, abstractmethod + +from dbcfeederlib.canplayer import CANplayer +from dbcfeederlib.dbc2vssmapper import Mapper, VSSObservation +from typing import Any, Dict, Optional +from queue import Queue + +log = logging.getLogger(__name__) + + +class CanReader(ABC): + """ + Provides means to read messages from a CAN bus. + """ + def __init__(self, rxqueue: Queue, mapper: Mapper, can_port: str, dump_file: Optional[str] = None): + """ + This init method is only supposed to be called by subclass' __init__ functions. + """ + self._running = False + self._queue = rxqueue + self._mapper = mapper + self._running = False + self._can_player: Optional[CANplayer] = None + + can_filters = mapper.can_frame_id_whitelist() + log.info("Using CAN frame ID whitelist=%s", can_filters) + self._can_kwargs: Dict[str, Any] = {"interface": "socketcan", "channel": can_port, "can_filters": can_filters} + if dump_file is not None: + self._can_kwargs["interface"] = "virtual" + self._can_kwargs["bitrate"] = 500000 + self._can_player = CANplayer(dump_file, can_port) + + def is_running(self) -> bool: + return self._running + + @abstractmethod + def _start_can_bus_listener(self): + """ + Start listening to CAN bus. + """ + pass + + def start(self): + """ + Start processing of messages. + """ + self._running = True + self._start_can_bus_listener() + if self._can_player is not None: + self._can_player.start() + + @abstractmethod + def _stop_can_bus_listener(self): + """ + Stop listening to CAN bus. + """ + pass + + def stop(self): + if self._can_player is not None: + self._can_player.stop() + self._running = False + self._stop_can_bus_listener() + + def _process_can_message(self, frame_id: int, data: Any): + try: + message_def = self._mapper.get_message_for_canid(frame_id) + if message_def is not None: + decode = message_def.decode(bytes(data), allow_truncated=True) + if log.isEnabledFor(logging.DEBUG): + log.debug("Decoded message: %s", str(decode)) + rx_time = time.time() + for signal_name, raw_value in decode.items(): # type: ignore + for signal_mapping in self._mapper.get_dbc2vss_mappings(signal_name): + if signal_mapping.time_condition_fulfilled(rx_time): + log.debug( + "Queueing %s, triggered by %s, raw value %s", + signal_mapping.vss_name, signal_name, raw_value + ) + self._queue.put(VSSObservation( + signal_name, signal_mapping.vss_name, raw_value, rx_time)) + else: + log.debug( + "Ignoring %s, triggered by %s, raw value %s", + signal_mapping.vss_name, signal_name, raw_value + ) + except Exception: + log.warning("Error processing CAN message with frame ID: %#x", frame_id, exc_info=True) diff --git a/dbc2val/dbcfeederlib/databrokerclientwrapper.py b/dbc2val/dbcfeederlib/databrokerclientwrapper.py index b56bd666..d2512269 100644 --- a/dbc2val/dbcfeederlib/databrokerclientwrapper.py +++ b/dbc2val/dbcfeederlib/databrokerclientwrapper.py @@ -200,7 +200,7 @@ def supports_subscription(self) -> bool: return True async def subscribe(self, vss_names: List[str], callback): - """Creates a subscription and calls the callback when data received""" + """Create a subscription and invoke the callback when data received.""" entries = [] for name in vss_names: # Always subscribe to target diff --git a/dbc2val/dbcfeederlib/dbc2vssmapper.py b/dbc2val/dbcfeederlib/dbc2vssmapper.py index 18372f22..9c96b6ba 100644 --- a/dbc2val/dbcfeederlib/dbc2vssmapper.py +++ b/dbc2val/dbcfeederlib/dbc2vssmapper.py @@ -27,12 +27,14 @@ import json import logging import sys -from typing import Any, Dict, List, Set, Optional, KeysView + from dataclasses import dataclass +from typing import Any, Dict, List, Set, Optional, KeysView +from can.typechecking import CanFilter from py_expression_eval import Parser # type: ignore[import] -from dbcfeederlib import dbcparser +from dbcfeederlib.dbcparser import DBCParser log = logging.getLogger(__name__) @@ -87,14 +89,13 @@ def time_condition_fulfilled(self, time: float) -> bool: Value (on_change) condition not evaluated """ fulfilled = True - log.debug(f"Checking interval for {self.vss_name}. " - f"Time is {time}, last sent {self.last_time}") + log.debug("Checking interval for %s. Time is %s, last sent %s", self.vss_name, time, self.last_time) # First shall always evaluate to true if (self.interval_ms > 0) and (self.last_time != 0.0): diff_ms = (time - self.last_time) * 1000.0 if diff_ms < self.interval_ms: - log.debug(f"Interval not exceeded for {self.vss_name}. Time is {time}") + log.debug("Interval not exceeded for %s. Time is %s", self.vss_name, time) fulfilled = False # We must set time already now even if a value check is performed later @@ -111,8 +112,10 @@ def change_condition_fulfilled(self, vss_value: Any) -> bool: if time condition is fulfilled. """ fulfilled = False - log.debug(f"Checking change condition for {self.vss_name}. " - f"New value {vss_value}, old value {self.last_vss_value}") + log.debug( + "Checking change condition for %s. New value %s, old value %s", + self.vss_name, vss_value, self.last_vss_value + ) if vss_value is not None: if self.last_vss_value is None: @@ -134,7 +137,7 @@ def transform_value(self, value: Any) -> Any: """ vss_value = None if self.transform is None: - log.debug(f"No mapping to VSS {self.vss_name}, using raw value {value}") + log.debug("No mapping to VSS %s, using raw value %s", self.vss_name, value) vss_value = value else: if "mapping" in self.transform: @@ -154,22 +157,26 @@ def transform_value(self, value: Any) -> Any: # It is assumed that you may consider it ok that transformation fails sometimes, # so giving warning instead of error # This could be e.g. trying to treat a string as int - log.warning(f"Transformation failed for value {value} " - f"for VSS signal {self.vss_name}, signal ignored!", exc_info=True) + log.warning( + "Transformation failed for value %s for VSS signal %s, signal ignored!", + value, self.vss_name, exc_info=True + ) else: # It is supposed that "extract_verify_transform" already have checked that # we have a valid transform, so we shall never end up here log.error("Unsupported transform") if vss_value is None: - log.info(f"No mapping to VSS {self.vss_name} found for raw value {value}," - f"returning None to indicate that it shall be ignored!") + log.info( + "No mapping to VSS %s found for raw value %s, returning None to indicate that it shall be ignored!", + self.vss_name, value + ) else: - log.debug(f"Transformed value {vss_value} for {self.vss_name}") + log.debug("Transformed value %s for %s", vss_value, self.vss_name) return vss_value -class Mapper(dbcparser.DBCParser): +class Mapper(DBCParser): """ Contains all mappings between CAN and VSS signals. @@ -188,7 +195,8 @@ def __init__(self, can_signal_default_values_file: Optional[str] = None): super().__init__(dbc_file_names, use_strict_parsing, expect_extended_frame_ids) - with open(mapping_definitions_file, "r") as file: + + with open(mapping_definitions_file, "r", encoding="utf-8") as file: try: jsonmapping = json.load(file) log.info("Reading CAN<->VSS mapping definitions from file %s", mapping_definitions_file) @@ -199,11 +207,11 @@ def __init__(self, ) sys.exit(-1) - self.dbc_default = {} + self._dbc_default = {} if can_signal_default_values_file is not None: - with open(can_signal_default_values_file, "r") as file: + with open(can_signal_default_values_file, "r", encoding="utf-8") as file: try: - self.dbc_default = json.load(file) + self._dbc_default = json.load(file) log.info("Reading default CAN signal values from file %s", can_signal_default_values_file) except Exception: log.error( @@ -214,13 +222,27 @@ def __init__(self, # Where we keep mapping, key is dbc signal name self._dbc2vss_mapping: Dict[str, List[VSSMapping]] = {} - # In this direction key is VSS name + # In this direction key is the VSS data entry name self._vss2dbc_mapping: Dict[str, List[VSSMapping]] = {} # Same, but key is CAN id mapping self._vss2dbc_can_id_mapping: Dict[int, List[VSSMapping]] = {} + # All frame IDs of CAN messages that contain signals for which a mapping to VSS exists + self._mapped_can_frame_ids: Set[int] = set() + self._can_filters: List[CanFilter] = [] self._traverse_vss_node("", jsonmapping) + def can_frame_id_whitelist(self) -> List[CanFilter]: + """ + Get all frame IDs of CAN messages that contain signals for which a mapping to VSS exists. + """ + if len(self._can_filters) == 0: + if len(self._mapped_can_frame_ids) > 0: + for frame_id in self._mapped_can_frame_ids: + self._can_filters.append(CanFilter(can_id=frame_id, can_mask=self._frame_id_mask)) + + return self._can_filters + def transform_dbc_value(self, vss_observation: VSSObservation) -> Any: """ Find VSS mapping and transform DBC value to VSS value. @@ -228,11 +250,15 @@ def transform_dbc_value(self, vss_observation: VSSObservation) -> Any: vss_signal = self.get_dbc2vss_mapping(vss_observation.dbc_name, vss_observation.vss_name) if vss_signal: value = vss_signal.transform_value(vss_observation.raw_value) - log.debug(f"Transformed dbc {vss_observation.dbc_name} to VSS " - f"{vss_observation.vss_name}, " - f"from raw value {vss_observation.raw_value} to {value}") + log.debug( + "Transformed CAN signal [name: %s, value %s] to VSS data entry [name: %s, value: %s]", + vss_observation.dbc_name, vss_observation.raw_value, vss_observation.vss_name, value + ) else: - log.error("No mapping found, that is not expected!") + log.error( + "No definition found for mapping CAN signal %s to VSS data entry %s, this is unexpected!", + vss_observation.dbc_name, vss_observation.vss_name + ) value = None return value @@ -241,7 +267,7 @@ def _extract_verify_transform(self, expanded_name: str, node: dict): Extract transformation definition and check syntax. """ if "transform" not in node: - log.debug(f"No transformation found for {expanded_name}") + log.debug("No transformation definition found for %s", expanded_name) # For now assumed that None is Ok return None transform = node["transform"] @@ -249,28 +275,31 @@ def _extract_verify_transform(self, expanded_name: str, node: dict): has_mapping = False if not isinstance(transform, dict): - log.error(f"Transform not dict for {expanded_name}") + log.error("Transformation definition for %s is not a dict", expanded_name) sys.exit(-1) if "mapping" in transform: tmp = transform["mapping"] if not isinstance(tmp, list): - log.error(f"Transform mapping not list for {expanded_name}") + log.error("Mapping definition for %s is not a list", expanded_name) sys.exit(-1) for item in tmp: if not (("from" in item) and ("to" in item)): - log.error(f"Mapping missing to and from in {item} for {expanded_name}") + log.error( + "Item %s of mapping definition for %s must have both \"to\" and \"from\" properties", + item, expanded_name + ) sys.exit(-1) has_mapping = True if "math" in transform: if has_mapping: - log.error(f"Can not have both mapping and math for {expanded_name}") + log.error("Can not have both \"mapping\" and \"math\" transformation defined for %s", expanded_name) sys.exit(-1) if not isinstance(transform["math"], str): - log.error(f"Math must be str for {expanded_name}") + log.error("Math transformation definition for %s must be a str", expanded_name) sys.exit(-1) elif not has_mapping: - log.error(f"Unsupported transform for {expanded_name}") + log.error("Unsupported transformation definition for %s", expanded_name) sys.exit(-1) return transform @@ -279,70 +308,89 @@ def _analyze_dbc2vss(self, expanded_name, node: dict, dbc2vss: dict): Analyze a dbc2vss entry (from CAN to VSS). """ - transform = self._extract_verify_transform(expanded_name, dbc2vss) - dbc_name = dbc2vss.get("signal", "") - if dbc_name == "": - log.error(f"No dbc signal found for {expanded_name}") + can_signal_name = dbc2vss.get("signal", "") + if can_signal_name == "": + log.error("Mapping definition for %s has no \"signal\" property", expanded_name) sys.exit(-1) + transformation_definition = self._extract_verify_transform(expanded_name, dbc2vss) on_change: bool = False if "on_change" in dbc2vss: tmp = dbc2vss["on_change"] if isinstance(tmp, bool): on_change = tmp else: - log.error(f"Value for on_change ({tmp}) is not bool") + log.error("Property \"on_change\" [%s] of mapping definition for %s is not a bool", expanded_name, tmp) sys.exit(-1) if "interval_ms" in dbc2vss: interval = dbc2vss["interval_ms"] if not isinstance(interval, int): - log.error(f"Faulty interval for {expanded_name}") + log.error( + "Property \"interval_ms\" [%s] of mapping definition for %s is not an integer", + expanded_name, interval + ) sys.exit(-1) else: if on_change: - log.info(f"Using default interval 0 ms for {expanded_name} " - f"as it has on_change condition") + log.info( + "Using default interval 0 ms for mapping definition of %s as it has \"on_change\" condition", + expanded_name + ) interval = 0 else: - log.info(f"Using default interval 1000 ms for {expanded_name}") + log.info("Using default interval 1000 ms for mapping definition of %s", expanded_name) interval = 1000 - mapping_entry = VSSMapping(expanded_name, dbc_name, transform, interval, on_change, + + if can_signal_name not in self._dbc2vss_mapping: + self._dbc2vss_mapping[can_signal_name] = [] + mapping_entry = VSSMapping(expanded_name, can_signal_name, transformation_definition, interval, on_change, node["datatype"], node["description"]) - if dbc_name not in self._dbc2vss_mapping: - self._dbc2vss_mapping[dbc_name] = [] - self._dbc2vss_mapping[dbc_name].append(mapping_entry) + self._dbc2vss_mapping[can_signal_name].append(mapping_entry) + + can_frame_id = self.get_canid_for_signal(can_signal_name) + if can_frame_id is None: + log.error( + """Could not find CAN message definition for signal %s used in dbc2vss + mapping definition for %s""", + can_signal_name, expanded_name) + else: + # Make sure that CAN frames with this ID pass CAN filtering + self._mapped_can_frame_ids.add(can_frame_id) def _analyze_vss2dbc(self, expanded_name, node: dict, vss2dbc: dict): """ Analyze a vss2dbc entry (from VSS to CAN). """ - transform = self._extract_verify_transform(expanded_name, vss2dbc) - dbc_name = vss2dbc.get("signal", "") - if dbc_name == "": - log.error(f"No dbc signal found for {expanded_name}") + can_signal_name = vss2dbc.get("signal", "") + if can_signal_name == "": + log.error("Mapping definition for %s has no \"signal\" property", expanded_name) sys.exit(-1) + transform = self._extract_verify_transform(expanded_name, vss2dbc) # For now we only support on_change, and we actually do not use the values on_change: bool = True interval = 0 if "on_change" in vss2dbc: - log.warning(f"on_change attribute ignored for {expanded_name}") + log.warning("Ignoring \"on_change\" property of mapping definition for %s", expanded_name) if "interval_ms" in vss2dbc: - log.warning(f"interval_ms attribute ignored for {expanded_name}") + log.warning("Ignoring \"interval_ms\" property of mapping definition for %s", expanded_name) - mapping_entry = VSSMapping(expanded_name, dbc_name, transform, interval, on_change, + mapping_entry = VSSMapping(expanded_name, can_signal_name, transform, interval, on_change, node["datatype"], node["description"]) - if dbc_name not in self._vss2dbc_mapping: + if can_signal_name not in self._vss2dbc_mapping: self._vss2dbc_mapping[expanded_name] = [] self._vss2dbc_mapping[expanded_name].append(mapping_entry) # Also add CAN-id - dbc_can_id = self.get_canid_for_signal(dbc_name) - if not dbc_can_id: - log.error(f"Could not find {dbc_name}") + can_frame_id = self.get_canid_for_signal(can_signal_name) + if can_frame_id is None: + log.error( + """Could not find CAN message definition for signal %s used in vss2dbc + mapping definition for %s""", + can_signal_name, expanded_name) return - if dbc_can_id not in self._vss2dbc_can_id_mapping: - self._vss2dbc_can_id_mapping[dbc_can_id] = [] - self._vss2dbc_can_id_mapping[dbc_can_id].append(mapping_entry) + if can_frame_id not in self._vss2dbc_can_id_mapping: + self._vss2dbc_can_id_mapping[can_frame_id] = [] + self._vss2dbc_can_id_mapping[can_frame_id].append(mapping_entry) def _analyze_signal(self, expanded_name, node): """ @@ -350,13 +398,13 @@ def _analyze_signal(self, expanded_name, node): """ dbc2vss_def = None if "dbc" in node: - log.debug(f"Signal {expanded_name} has dbc!") + log.debug("VSS signal %s has \"dbc\" property", expanded_name) dbc2vss_def = node["dbc"] if "dbc2vss" in node: - log.error(f"Node {expanded_name} has both dbc and dbc2vss") + log.error("VSS signal %s has both \"dbc\" and \"dbc2vss\" properties", expanded_name) sys.exit(-1) elif "dbc2vss" in node: - log.debug(f"Signal {expanded_name} has dbc2vss!") + log.debug("VSS signal %s has \"dbc2vss\" property", expanded_name) dbc2vss_def = node["dbc2vss"] if dbc2vss_def is not None: self._analyze_dbc2vss(expanded_name, node, dbc2vss_def) @@ -410,13 +458,11 @@ def get_vss2dbc_entries(self) -> KeysView[str]: def get_vss_names(self) -> Set[str]: """Get all VSS names used in mappings, both vss2dbc and dbc2vss""" - vss_names = set() + vss_names: Set[str] = set() for entry in self._dbc2vss_mapping.values(): for vss_mapping in entry: vss_names.add(vss_mapping.vss_name) - for key_entry in self._vss2dbc_mapping.keys(): - vss_names.add(key_entry) - return vss_names + return vss_names.union(self._vss2dbc_mapping.keys()) def has_dbc2vss_mapping(self) -> bool: return bool(self._dbc2vss_mapping) @@ -449,24 +495,24 @@ def get_default_values(self, can_id) -> Dict[str, Any]: res = {} for signal in self.get_signals_for_canid(can_id): - if signal in self.dbc_default: - res[signal] = self.dbc_default[signal] + if signal in self._dbc_default: + res[signal] = self._dbc_default[signal] else: - log.error(f"No default value for {signal} in CAN id {can_id}") + log.error("No default value for CAN signal %s in message with frame ID %#x", signal, can_id) return res def get_value_dict(self, can_id): - log.debug(f"Using stored information to create CAN-frame for {can_id}") + log.debug("Using stored information to create CAN message with frame ID %#x", can_id) res = self.get_default_values(can_id) for can_mapping in self._vss2dbc_can_id_mapping[can_id]: - log.debug(f"Using DBC id {can_mapping.dbc_name} with value {can_mapping.last_dbc_value}") + log.debug("Using CAN signal %s with value %s", can_mapping.dbc_name, can_mapping.last_dbc_value) if can_mapping.last_dbc_value is not None: res[can_mapping.dbc_name] = can_mapping.last_dbc_value return res def __contains__(self, key): - return key in self._dbc2vss_mapping.keys() + return key in self._dbc2vss_mapping def __getitem__(self, item): return self._dbc2vss_mapping[item] diff --git a/dbc2val/dbcfeederlib/dbcparser.py b/dbc2val/dbcfeederlib/dbcparser.py index 65d0eaaa..0ceb27e6 100644 --- a/dbc2val/dbcfeederlib/dbcparser.py +++ b/dbc2val/dbcfeederlib/dbcparser.py @@ -27,7 +27,6 @@ from types import MappingProxyType from typing import cast, Dict, Optional, List, Set, Tuple - log = logging.getLogger(__name__) @@ -43,6 +42,13 @@ def __init__(self, use_strict_parsing: bool = True, expect_extended_frame_ids: bool = False): + # by default, do not mask any bits of standard (11-bit) frame IDs + self._frame_id_mask: int = 0b11111111111 + if expect_extended_frame_ids: + # ignore 3 priority bits and 8 source address bits of extended + # (29-bit) frame IDs when looking up message definitions + self._frame_id_mask = 0b00011111111111111111100000000 + first = True processed_files: Set[str] = set() for filename in [name.strip() for name in dbc_file_names]: @@ -51,14 +57,12 @@ def __init__(self, continue processed_files.add(filename) if first: - # by default, do not mask any bits of standard (11-bit) frame IDs - mask = 0b11111111111 - if expect_extended_frame_ids: - # ignore 3 priority bits and 8 source address bits of extended - # (29-bit) frame IDs when looking up message definitions - mask = 0b00011111111111111111100000000 log.info("Reading definitions from DBC file %s", filename) - database = cantools.database.load_file(filename, strict=use_strict_parsing, frame_id_mask=mask) + database = cantools.database.load_file( + filename, + strict=use_strict_parsing, + frame_id_mask=self._frame_id_mask + ) # load_file can return multiple types of databases, make sure we have CAN database if isinstance(database, cantools.database.can.database.Database): self._db = cast(cantools.database.can.database.Database, database) @@ -97,7 +101,12 @@ def _add_db_file(self, filename: str): else: log.warning("Cannot read CAN message definitions from file using unsupported format: %s", db_format) + def can_frame_id_whitelist_mask(self) -> int: + """Get the frame ID bit mask used for filtering messages received from CAN bus.""" + return self._frame_id_mask + def get_canid_for_signal(self, sig_to_find: str) -> Optional[int]: + """Get the frame ID of the CAN message that contains a given signal""" if sig_to_find in self._signal_to_canid: return self._signal_to_canid[sig_to_find] @@ -113,7 +122,7 @@ def get_canid_for_signal(self, sig_to_find: str) -> Optional[int]: return None def get_signals_for_canid(self, canid: int) -> Set[str]: - + """Get the names of the signals contained in a CAN message""" if canid in self._canid_to_signals: return self._canid_to_signals[canid] @@ -126,6 +135,7 @@ def get_signals_for_canid(self, canid: int) -> Set[str]: return names def get_message_for_canid(self, canid: int) -> Optional[cantools.database.Message]: + """Look up a CAN message definition by frame ID""" try: return self._db.get_message_by_frame_id(canid) except KeyError: diff --git a/dbc2val/dbcfeederlib/dbcreader.py b/dbc2val/dbcfeederlib/dbcreader.py index a24b2b5b..3c0d6729 100644 --- a/dbc2val/dbcfeederlib/dbcreader.py +++ b/dbc2val/dbcfeederlib/dbcreader.py @@ -19,92 +19,36 @@ ######################################################################## import threading -import time import logging from queue import Queue +from typing import Optional +from dbcfeederlib.canclient import CANClient +from dbcfeederlib import canreader from dbcfeederlib import dbc2vssmapper -from dbcfeederlib import canclient log = logging.getLogger(__name__) -class DBCReader: - def __init__(self, rxqueue: Queue, mapper: dbc2vssmapper.Mapper): - self._queue = rxqueue - self._mapper = mapper - self._canidwl = self.get_whitelist() - log.info("CAN ID whitelist=%s", self._canidwl) - self._running = False - self._canclient = None +class DBCReader(canreader.CanReader): + def __init__(self, rxqueue: Queue, mapper: dbc2vssmapper.Mapper, can_port: str, dump_file: Optional[str] = None): + super().__init__(rxqueue, mapper, can_port, dump_file) - def start_listening(self, *args, **kwargs): - """Start listening to CAN bus + def _rx_worker(self): - Arguments are passed directly to :class:`can.BusABC`. Typically these - may include: + log.info("Starting to receive CAN messages fom bus") + while self.is_running(): + msg = self._canclient.recv(timeout=1) + if msg is not None: + log.debug("Processing CAN message with frame ID %#x", msg.get_arbitration_id) + self._process_can_message(msg.get_arbitration_id(), msg.get_data()) + log.info("Stopped receiving CAN messages from bus") - :param channel: - Backend specific channel for the CAN interface. - :param str bustype: - Name of the interface. See - `python-can manual `__ - for full list of supported interfaces. - :param int bitrate: - Bitrate in bit/s. - """ - self._running = True - self._canclient = canclient.CANClient(*args, **kwargs) + def _start_can_bus_listener(self): + self._canclient = CANClient(**self._can_kwargs) rx_thread = threading.Thread(target=self._rx_worker) rx_thread.start() - def get_whitelist(self): - log.debug("Generating CAN ID whitelist") - white_list = [] - for signal_name in self._mapper.get_dbc2vss_entries(): - canid = self._mapper.get_canid_for_signal(signal_name) - if canid is not None and canid not in white_list: - log.debug("Adding CAN frame id %d of message containing signal %s to white list", canid, signal_name) - white_list.append(canid) - return white_list - - def _process_message(self, frame_id: int, data: bytearray): - - log.debug("processing message with frame ID %#x from CAN bus", frame_id) - try: - message_def = self._mapper.get_message_for_canid(frame_id) - if message_def is not None: - decode = message_def.decode(bytes(data), allow_truncated=True) - if log.isEnabledFor(logging.DEBUG): - log.debug("Decoded message: %s", str(decode)) - - rx_time = time.time() - for k, v in decode.items(): # type: ignore - vss_mappings = self._mapper.get_dbc2vss_mappings(k) - for signal in vss_mappings: - if signal.time_condition_fulfilled(rx_time): - log.debug("Queueing %s, triggered by %s, raw value %s", signal.vss_name, k, v) - self._queue.put(dbc2vssmapper.VSSObservation(k, signal.vss_name, v, rx_time)) - else: - log.debug("Ignoring %s, triggered by %s, raw value %s", signal.vss_name, k, v) - except Exception: - log.warning("Error decoding message with frame ID: %#x", frame_id, exc_info=True) - - def _rx_worker(self): - log.info("Starting Rx thread") - while self._running: - can_message = self._canclient.recv(timeout=1) - if can_message is not None: - frame_id = can_message.get_arbitration_id() - if frame_id in self._canidwl: - self._process_message(frame_id, can_message.get_data()) - else: - log.debug("ignoring CAN message with frame ID %s not on whitelist", frame_id) - log.info("Stopped Rx thread") - - def stop(self): - self._running = False - if self._canclient: - self._canclient.stop() - self._canclient = None + def _stop_can_bus_listener(self): + self._canclient.stop() diff --git a/dbc2val/dbcfeederlib/elm2canbridge.py b/dbc2val/dbcfeederlib/elm2canbridge.py index 3c8266f0..c89df1bd 100644 --- a/dbc2val/dbcfeederlib/elm2canbridge.py +++ b/dbc2val/dbcfeederlib/elm2canbridge.py @@ -23,14 +23,16 @@ import serial # type: ignore import can # type: ignore import threading + from multiprocessing import Queue, Process +from typing import Any, Dict, List, Optional # To limit memory in case parsing thread dies and serial reader keeps filling QUEUE_MAX_ELEMENTS = 2048 class elm2canbridge: - def __init__(self, canport, cfg, whitelist=None): + def __init__(self, canport: str, cfg: Dict[str, Any], whitelist: Optional[List[int]] = None): print("Try setting up elm2can bridge") print("Creating virtual CAN interface") result = os.system("./createvcan.sh") @@ -38,8 +40,7 @@ def __init__(self, canport, cfg, whitelist=None): print(f"Calling createvcan.sh failed with error code {os.WEXITSTATUS(result)}") sys.exit(-1) - self.canport = canport - self.whitelist = whitelist + self._whitelist = whitelist elm = serial.Serial() elm.baudrate = cfg['baud'] elm.port = cfg['port'] @@ -54,20 +55,21 @@ def __init__(self, canport, cfg, whitelist=None): print("elm2canbridge: Can not open serial port") sys.exit(-1) - self.initelm(elm, cfg['speed'], cfg['canack']) - can = self.initcan(cfg) - - serQueue = Queue(QUEUE_MAX_ELEMENTS) + self._init_elm(elm, cfg['speed'], cfg['canack']) + # pylint: disable=abstract-class-instantiated + can_device = can.interface.Bus(channel=canport, interface='socketcan') + ser_queue: Queue = Queue(QUEUE_MAX_ELEMENTS) - mt = threading.Thread(target=self.serialProcesor, args=(serQueue, can,)) + # mt = threading.Thread(target=self._serial_procesor, args=(ser_queue, can)) + mt = threading.Thread(target=self._serial_procesor, args=(ser_queue, can_device)) mt.start() - sr = Process(target=self.serialReader, args=(elm, serQueue,)) + sr = Process(target=self._serial_reader, args=(elm, ser_queue,)) sr.start() srpid = sr.pid print("Running on pid {}".format(srpid)) - def serialReader(self, elm, q): + def _serial_reader(self, elm: serial.Serial, q: Queue) -> None: # No time to loose. Read and stuff into queue # using bytearray, reading bigger strides and searching for '\r' gets input overruns in UART # so this is the dumbest, fastest way @@ -77,7 +79,7 @@ def serialReader(self, elm, q): os.nice(-10) print("elm2canbridge: Enter monitoring mode...") - if self.whitelist is not None: + if self._whitelist is not None: print("Applying whitelist") elm.write(b'STM\r') elm.read(4) # Consume echo @@ -87,26 +89,26 @@ def serialReader(self, elm, q): elm.read(5) # Consume echo elm.timeout = None - CR = 13 + CARRIAGE_RETURN = 13 while True: buffer[index] = elm.read()[0] # print("Read: {}=={} ".format(buffer[index],CR)) # print("Buffer {}".format(buffer)) - if buffer[index] == CR or index == 63: + if buffer[index] == CARRIAGE_RETURN or index == 63: # print("Received {}".format(bytes(buffer).hex()[:index])) q.put(buffer[:index]) # Todo will slice copy deep enough or is this a race? index = 0 continue index += 1 - def serialProcesor(self, q, candevice): + def _serial_procesor(self, q: Queue, can_device: can.interface.Bus): print("elm2canbridge: Waiting for incoming...") while True: line = q.get().decode('utf-8') # print("Received {}".format(line)) - isextendedid = False + is_extended_id = False # print("Received from elm: {}".format(line)) try: items = line.split() @@ -115,7 +117,7 @@ def serialProcesor(self, q, candevice): # print("Normal ID {}".format(canid)) del items[0] elif len(items) >= 4: # extended id - isextendedid = True + is_extended_id = True canid = int(items[0] + items[1] + items[2] + items[3], 16) items = items[4:] # print("Extended ID {}".format(canid)) @@ -128,52 +130,52 @@ def serialProcesor(self, q, candevice): data = ''.join(items) # print("data: {}".format(data)) - dataBytes = bytearray.fromhex(data) + data_bytes = bytearray.fromhex(data) except Exception: # print("Error parsing: " + str(e)) # print("Error. ELM line, items **{}**".format(line.split())) continue - if len(dataBytes) > 8: + if len(data_bytes) > 8: continue if canid > 0x2000000: continue - canmsg = can.Message(arbitration_id=canid, data=dataBytes, is_extended_id=isextendedid) + can_msg = can.Message(arbitration_id=canid, data=data_bytes, is_extended_id=is_extended_id) try: - candevice.send(canmsg) + can_device.send(can_msg) except Exception as e: print("Error forwarding message to Can ID 0x{:02x} (extended: {}) with data 0x{}". - format(canid, isextendedid, dataBytes.hex())) + format(canid, is_extended_id, data_bytes.hex())) print("Error: {}".format(e)) - def initelm(self, elm, canspeed, ack): - """Currently only works with obdlink devices""" + def _init_elm(self, elm: serial.Serial, can_speed: int, ack: bool): + """Currently only works with OBDLink devices""" print("Detecting ELM...") elm.write(b'\r\r') - self.waitforprompt(elm) - self.writetoelm(elm, b'ATI\r') - resp = self.readresponse(elm) + self._wait_for_prompt(elm) + self._write_to_elm(elm, b'ATI\r') + resp = self._read_response(elm) if not resp.strip().startswith("ELM"): print("Unexpected response to ATI: {}".format(resp)) sys.exit(-1) - self.waitforprompt(elm) + self._wait_for_prompt(elm) print("Disable linefeed") - self.executecommand(elm, b'ATL 0\r') + self._execute_command(elm, b'ATL 0\r') print("Enable Headers") - self.executecommand(elm, b'AT H1\r') + self._execute_command(elm, b'AT H1\r') print("Enable Spaces") - self.executecommand(elm, b'AT S1\r') + self._execute_command(elm, b'AT S1\r') print("Disable DLC") - self.executecommand(elm, b'AT D0\r') + self._execute_command(elm, b'AT D0\r') - if self.whitelist: + if self._whitelist is not None: print("Using Whitelist") print("Clear all filters") - self.executecommand(elm, b'STFAC\r') - for canid in self.whitelist: + self._execute_command(elm, b'STFAC\r') + for canid in self._whitelist: if canid < 2048: # standard CAN frame IDs are 11 bits long, # so we can safely ignore the 5 most significant bits @@ -187,28 +189,24 @@ def initelm(self, elm, canspeed, ack): # the priority. cmd = "STFPA {:08x}, 03ffffff\r".format(canid) print("Exec "+str(cmd)) - self.executecommand(elm, cmd.encode('utf-8')) + self._execute_command(elm, cmd.encode('utf-8')) print("Set CAN speed") - self.executecommand(elm, b'STP 32\r') - cmd = "STPBR " + str(canspeed) + "\r" - self.executecommand(elm, cmd.encode('utf-8')) - self.executecommand(elm, b'STPBRR\r', expectok=False) - print("Speed is {}".format(canspeed)) + self._execute_command(elm, b'STP 32\r') + cmd = "STPBR " + str(can_speed) + "\r" + self._execute_command(elm, cmd.encode('utf-8')) + self._execute_command(elm, b'STPBRR\r', expectok=False) + print("Speed is {}".format(can_speed)) if ack: - self.executecommand(elm, b'STCMM 1\r') + self._execute_command(elm, b'STCMM 1\r') else: - self.executecommand(elm, b'STCMM 0\r') - - # open vcan where we mirror the elmcan monitor output - def initcan(self, cfg): - return can.interface.Bus(self.canport, bustype='socketcan') # pylint: disable=abstract-class-instantiated + self._execute_command(elm, b'STCMM 0\r') - def waitforprompt(self, elm): + def _wait_for_prompt(self, elm): while elm.read() != b'>': pass - def writetoelm(self, elm, data): + def _write_to_elm(self, elm, data): # print("Write") length = len(data) elm.write(data) @@ -217,7 +215,7 @@ def writetoelm(self, elm, data): print("elm2canbridge: Not the same {}/{}".format(data, echo)) # print("Write Done") - def readresponse(self, elm): + def _read_response(self, elm): response = "" while True: d = elm.read() @@ -226,11 +224,11 @@ def readresponse(self, elm): response = response + d.decode('utf-8') # print("DEBUG: "+response) - def executecommand(self, elm, command, expectok=True): - self.writetoelm(elm, command) - resp = self.readresponse(elm) + def _execute_command(self, elm, command, expectok=True): + self._write_to_elm(elm, command) + resp = self._read_response(elm) if expectok and resp.strip() != "OK": print("Invalid response {} for command {}".format(resp, command)) sys.exit(-1) - self.waitforprompt(elm) + self._wait_for_prompt(elm) return resp diff --git a/dbc2val/dbcfeederlib/j1939reader.py b/dbc2val/dbcfeederlib/j1939reader.py index 0d2dab87..9f113624 100644 --- a/dbc2val/dbcfeederlib/j1939reader.py +++ b/dbc2val/dbcfeederlib/j1939reader.py @@ -24,65 +24,34 @@ # $ pip3 install can-j1939 import logging -import time -import j1939 # type: ignore[import] -from dbcfeederlib import dbc2vssmapper from queue import Queue +from typing import Optional -log = logging.getLogger(__name__) - - -class J1939Reader: +import j1939 # type: ignore[import] - def __init__(self, rxqueue: Queue, mapper: dbc2vssmapper.Mapper): - self._queue = rxqueue - self._mapper = mapper - self._ecu = j1939.ElectronicControlUnit() +from dbcfeederlib import canreader +from dbcfeederlib import dbc2vssmapper - def stop(self): - self._ecu.disconnect() +log = logging.getLogger(__name__) - def start_listening(self, *args, **kwargs): - """Start listening to CAN bus - Arguments are passed directly to :class:`can.BusABC`. Typically these - may include: +class J1939Reader(canreader.CanReader): - :param channel: - Backend specific channel for the CAN interface. - :param str bustype: - Name of the interface. See - `python-can manual `__ - for full list of supported interfaces. - :param int bitrate: - Bitrate in bit/s. - """ + def __init__(self, rxqueue: Queue, mapper: dbc2vssmapper.Mapper, can_port: str, dump_file: Optional[str] = None): + super().__init__(rxqueue, mapper, can_port, dump_file) - # Connect to the CAN bus - self._ecu.connect(*args, **kwargs) - self._ecu.subscribe(self.on_message) + self._ecu = j1939.ElectronicControlUnit() + self._ecu.subscribe(self._on_message) - def on_message(self, priority: int, pgn: int, sa: int, timestamp: int, data): + def _on_message(self, priority: int, pgn: int, source_address: int, timestamp: int, data): # create an extended CAN frame ID from PGN and source address - extended_can_id: int = pgn << 8 | sa - message = self._mapper.get_message_for_canid(extended_can_id) - if message is not None: - log.debug("processing j1939 message [PGN: %#x]", pgn) - try: - decode = message.decode(bytes(data), allow_truncated=True) - if log.isEnabledFor(logging.DEBUG): - log.debug("Decoded message: %s", str(decode)) + extended_frame_id: int = pgn << 8 | source_address + log.debug("Processing j1939 message [frame_id: %d, PGN %#x]", extended_frame_id, pgn) + self._process_can_message(extended_frame_id, data) + + def _start_can_bus_listener(self): + self._ecu.connect(**self._can_kwargs) - rx_time = time.time() - for k, v in decode.items(): # type: ignore - vss_mappings = self._mapper.get_dbc2vss_mappings(k) - # Now time is defined per VSS signal, so handling needs to be different - for signal in vss_mappings: - if signal.time_condition_fulfilled(rx_time): - log.debug("Queueing %s, triggered by %s, raw value %s", signal.vss_name, k, v) - self._queue.put(dbc2vssmapper.VSSObservation(k, signal.vss_name, v, rx_time)) - else: - log.debug("Ignoring %s, triggered by %s, raw value %s", signal.vss_name, k, v) - except Exception: - log.warning("Error decoding message %s [PGN: %#x]", message.name, pgn, exc_info=True) + def _stop_can_bus_listener(self): + self._ecu.disconnect() diff --git a/dbc2val/test/test_mapping_error/test_mapping_error.py b/dbc2val/test/test_mapping_error/test_mapping_error.py index 26674a4d..9a1d71d8 100644 --- a/dbc2val/test/test_mapping_error/test_mapping_error.py +++ b/dbc2val/test/test_mapping_error/test_mapping_error.py @@ -18,11 +18,13 @@ # SPDX-License-Identifier: Apache-2.0 ######################################################################## -from dbcfeederlib import dbc2vssmapper import os -import pytest # type: ignore import logging +import pytest # type: ignore + +from dbcfeederlib import dbc2vssmapper + def test_unknown_transform(caplog, capsys): @@ -34,4 +36,6 @@ def test_unknown_transform(caplog, capsys): dbc2vssmapper.Mapper(mapping_path, dbc_file_names) out, err = capsys.readouterr() assert excinfo.value.code == -1 - assert caplog.record_tuples == [("dbcfeederlib.dbc2vssmapper", logging.ERROR, "Unsupported transform for A.B")] + assert caplog.record_tuples == [ + ("dbcfeederlib.dbc2vssmapper", logging.ERROR, "Unsupported transformation definition for A.B") + ] diff --git a/dbc2val/test/test_readers/test_readers.py b/dbc2val/test/test_readers/test_readers.py index c709338d..bfb9465e 100644 --- a/dbc2val/test/test_readers/test_readers.py +++ b/dbc2val/test/test_readers/test_readers.py @@ -51,7 +51,7 @@ def j1939reader(dbc2vss_queue: Queue) -> J1939Reader: dbc_file_names=[test_path + "/j1939.kcd"], expect_extended_frame_ids=True, use_strict_parsing=True) - return J1939Reader(dbc2vss_queue, mapper) + return J1939Reader(dbc2vss_queue, mapper, "vcan0") @pytest.fixture @@ -61,7 +61,7 @@ def dbcreader(dbc2vss_queue: Queue) -> DBCReader: dbc_file_names=[test_path + "/standard_can.kcd"], expect_extended_frame_ids=False, use_strict_parsing=True) - return DBCReader(dbc2vss_queue, mapper) + return DBCReader(dbc2vss_queue, mapper, "vcan0") def test_j1939reader_processes_j1939_message(j1939reader: J1939Reader, dbc2vss_queue: Queue): @@ -69,7 +69,7 @@ def test_j1939reader_processes_j1939_message(j1939reader: J1939Reader, dbc2vss_q # GIVEN a reader based on CAN message and mapping definitions # WHEN a message is received from the CAN bus for which a mapping gas been defined - j1939reader.on_message(priority=1, pgn=0x1FFFF, sa=0x45, timestamp=0, data=[0x10, 0x32, 0x54]) + j1939reader._on_message(priority=1, pgn=0x1FFFF, source_address=0x45, timestamp=0, data=[0x10, 0x32, 0x54]) # THEN the reader determines both VSS Data Entries that the CAN signals are mapped to signal_mappings: Dict[str, VSSObservation] = {} @@ -86,7 +86,7 @@ def test_j1939reader_ignores_unknown_CAN_messages(j1939reader: J1939Reader, dbc2 # GIVEN a reader based on CAN message and mapping definitions # WHEN a message with an unknown PGN is received from the CAN bus - j1939reader.on_message(priority=1, pgn=0x1FFFA, sa=0x12, timestamp=0, data=[0x10, 0x32, 0x54]) + j1939reader._on_message(priority=1, pgn=0x1FFFA, source_address=0x12, timestamp=0, data=[0x10, 0x32, 0x54]) # THEN the reader ignores the message with pytest.raises(Empty): @@ -98,7 +98,7 @@ def test_dbcreader_processes_can_message(dbcreader: DBCReader, dbc2vss_queue: Qu # GIVEN a reader based on CAN message and mapping definitions # WHEN a message is received from the CAN bus for which a mapping has been defined - dbcreader._process_message(frame_id=0x111A, data=bytearray([0x10, 0x32, 0x54])) + dbcreader._process_can_message(frame_id=0x111A, data=bytearray([0x10, 0x32, 0x54])) # THEN the reader determines both VSS Data Entries that the CAN signals are mapped to signal_mappings: Dict[str, VSSObservation] = {} @@ -115,7 +115,7 @@ def test_dbcreader_ignores_unknown_CAN_messages(dbcreader: DBCReader, dbc2vss_qu # GIVEN a reader based on CAN message and mapping definitions # WHEN a message with an unknown frame ID is received from the CAN bus - dbcreader._process_message(frame_id=0x10AC, data=bytearray([0x10, 0x32, 0x54])) + dbcreader._process_can_message(frame_id=0x10AC, data=bytearray([0x10, 0x32, 0x54])) # THEN the reader ignores the message with pytest.raises(Empty):