Integrating an RF modem interface with fprime-gds #2639
Closed
ReggieMarr
started this conversation in
General
Replies: 2 comments 27 replies
-
Is this interfacing with the modem on the FSW or the GDS side of things? |
Beta Was this translation helpful? Give feedback.
6 replies
-
I've expanded on my initial implementation a bit but I'll leave the reply comment up for posterity. #!/usr/bin/env python3
import abc
import socket
from typing import NamedTuple, Tuple, Type
import threading
from fprime_gds.common.communication.framing import FramerDeframer
from fprime_gds.common.communication.ground import TCPGround
from fprime_gds.common.communication.updown import Downlinker, Uplinker
from fprime_gds.plugin.definitions import gds_plugin_implementation
from fprime_gds.common.communication.adapters.base import BaseAdapter
from fprime_gds.common.communication.adapters.ip import IpAdapter
from fprime_gds.plugin.system import FpFramerDeframer
from modem_interface import ModemInterface
from dataclasses import dataclass, field
from datetime import datetime
from time import time
import sys
@dataclass
class ModemAdapterCfg:
modem_ip_address: Tuple[str, int]
fsw_ip_address: Tuple[str, int]
poll_hz: int = 10
poll_timeout_s = 5000
@dataclass
class ModemStatus:
bytes_decoded: int = 0
start: datetime = datetime.now()
isPolling: bool = False
def __str__(self):
"""
Returns a formatted string representation of the status.
"""
time_since_start = 0
if self.isPolling:
time_since_start = (datetime.now() - self.start).total_seconds()
return (
f"Status:\n"
f" Time since start (S): {time_since_start} ms\n"
f" Bytes Decoded: {self.bytes_decoded}\n"
)
class ModemAdapter(BaseAdapter):
'''This provides an fprime-gds adapter plugin such that downlink is facilitated via rf
(through the use of the modem interface) and uplink is meant to be done using the usual tcp interface.
'''
def __init__(self, cfg: ModemAdapterCfg):
self.cfg = cfg
self.poll = threading.Event()
self.shutdown = threading.Event()
self.modem = ModemInterface(cfg.modem_ip_address[0], cfg.modem_ip_address[1])
self.ip = IpAdapter(cfg.fsw_ip_address[0], cfg.fsw_ip_address[1])
self.poll_thread = threading.Thread(target=self._poll_for_datagrams, daemon=True)
self._status = ModemStatus()
@classmethod
def get_name(cls) -> str:
return "ModemAdapter"
@classmethod
def get_arguments(cls):
""" Arguments to request from the CLI """
return {}
@classmethod
def check_arguments(cls):
""" Check arguments from the CLI """
pass
@property
def status(self):
"""
Retrieves and logs the status of the modem adapter.
"""
return self._status
def _poll_for_datagrams(self):
while not self.shutdown.is_set():
# Blocks thread on max timeout
try:
self.poll.wait(self.cfg.poll_timeout_s)
self.modem.decodeIncomingData()
time.sleep(1 / self.cfg.poll_hz)
except RuntimeError:
pass
def __del__(self):
self.close()
self.shutdown.set()
def open(self):
self._status.start = datetime.now()
self.poll.set()
def close(self):
self.poll.clear()
def read(self, timeout=0.500):
"""
Function that checks for available datagrams and pops the first one
when it becomes available. If no datagram becomes available within
'timeout' seconds, a RuntimeError is raised indicating the timeout.
Parameters:
timeout (float): Maximum time in seconds to wait for a datagram.
Default is 0.500 seconds.
Raises:
RuntimeError: When the function times out without a datagram being
available.
Returns:
datagram: The first datagram that becomes available.
"""
start_time = time.time() # recording the start time
datagram = None
while time.time() - start_time < timeout:
if self.modem.datagramsAvailable():
datagram = self.modem.popDatagram()
return datagram
# Sleep for a small duration to avoid busy-waiting
time.sleep(0.01)
end_time = time.time() # recording the end time
raise RuntimeError(f"Modem timed out after {end_time - start_time} seconds!")
def write(self, frame):
self.ip.write(frame)
@classmethod
@gds_plugin_implementation
def register_communication_plugin(cls) -> Type["ModemAdapter"]:
return cls
def launch_modem(modem: ModemAdapter):
adapter = modem
ground = TCPGround()
checksum_type='crc32',
framer = FpFramerDeframer(checksum_type)
downlinker = Downlinker(modem, ground, framer)
uplinker = Uplinker(adapter, ground, framer, downlinker)
# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
adapter.open()
# Finally start the processing of uplink and downlink
downlinker.start()
uplinker.start()
uplinker.join()
downlinker.join() Does this essentially capture the approach that the plugin framework implies ? |
Beta Was this translation helpful? Give feedback.
21 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm currently working on integrating a previously developed interface to a Newtec MDM6000 modem which serves as the hardware interface between the flight software and the ground station (via RF).
I'd like to leverage the test integration api for deframing/decoding data received from the modem.
Since I'm interfacing with the modem via a udp socket connection it seems to me that I essentially want to provide a wrapper around my modem interface that can use the ThreadedUDPServer as a base class however I've been looking into the use of plugins lately and I wonder if instead I should be providing the modem interface via a plugin.
Any input on which seems to be the more "canonical" way is appreciated.
Beta Was this translation helpful? Give feedback.
All reactions