Skip to content

Commit

Permalink
Hide actual MQTT implementation behind trait+newtype (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
DennisOSRM authored Nov 30, 2023
1 parent 9b2b6eb commit b233e54
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde_json = "1.0"
hostname = "0.3.1"
toml = "0.8.8"
serde_derive = "1.0.193"
anyhow = "1.0.75"

[build-dependencies]
protobuf-codegen = "3.3.0"
45 changes: 11 additions & 34 deletions src/home_assistant.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,31 @@
use std::{thread, time::Duration};

use crate::home_assistant_config::DeviceConfig;
use crate::mqtt_wrapper::MqttWrapper;
use crate::{mqtt_config::MqttConfig, protos::hoymiles::RealData::HMSStateResponse};

use crate::home_assistant_config::SensorConfig;
use crate::metric_collector::MetricCollector;
use log::{debug, error};
use rumqttc::{Client, MqttOptions, QoS};
use serde_json::json;

pub struct HomeAssistant {
client: Client,
pub struct HomeAssistant<MQTT: MqttWrapper> {
client: MQTT,
}

impl HomeAssistant {
impl<MQTT: MqttWrapper> HomeAssistant<MQTT> {
pub fn new(config: &MqttConfig) -> Self {
let this_host = hostname::get().unwrap().into_string().unwrap();
let mut mqttoptions = MqttOptions::new(
format!("hms-mqtt-publisher_{}", this_host),
&config.host,
config.port.unwrap_or(1883),
);
mqttoptions.set_keep_alive(Duration::from_secs(5));

//parse the mqtt authentication options
if let Some((username, password)) = match (&config.username, &config.password) {
(None, None) => None,
(None, Some(_)) => None,
(Some(username), None) => Some((username.clone(), "".into())),
(Some(username), Some(password)) => Some((username.clone(), password.clone())),
} {
mqttoptions.set_credentials(username, password);
}

let (client, mut connection) = Client::new(mqttoptions, 10);

thread::spawn(move || {
// keep polling the event loop to make sure outgoing messages get sent
for _ in connection.iter() {}
});

let client = MQTT::new(config);
Self { client }
}

fn publish_json(&mut self, topic: &str, payload: serde_json::Value) {
debug!("Publishing to {topic} with payload {payload}");

let payload = serde_json::to_string(&payload).unwrap();
if let Err(e) = self.client.publish(topic, QoS::AtMostOnce, true, payload) {
error!("Failed to publish message: {e}");
if let Err(e) =
self.client
.publish(topic, crate::mqtt_wrapper::QoS::AtMostOnce, true, payload)
{
error!("Failed to publish message: {e:?}");
}
}

Expand All @@ -68,7 +45,7 @@ impl HomeAssistant {
}
}

impl MetricCollector for HomeAssistant {
impl<MQTT: MqttWrapper> MetricCollector for HomeAssistant<MQTT> {
fn publish(&mut self, hms_state: &HMSStateResponse) {
let config_topic = format!("homeassistant/sensor/hms_{}", hms_state.short_dtu_sn());
let state_topic = format!("solar/hms_{}/state", hms_state.short_dtu_sn());
Expand Down
2 changes: 1 addition & 1 deletion src/inverter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::protos::hoymiles::RealData::{HMSStateResponse, RealDataResDTO};
use crc16::*;
use crc16::{State, MODBUS};
use log::{debug, info};
use protobuf::Message;
use std::io::{Read, Write};
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod home_assistant_config;
pub mod inverter;
pub mod metric_collector;
pub mod mqtt_config;
pub mod mqtt_wrapper;
pub mod protos;
pub mod simple_mqtt;
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
// TODO: support publishing to S-Miles cloud, too

mod logging;
mod rumqttc_wrapper;

use hms_mqtt_publish::home_assistant::HomeAssistant;
use hms_mqtt_publish::inverter::Inverter;
use hms_mqtt_publish::metric_collector::MetricCollector;
use hms_mqtt_publish::mqtt_config;
use hms_mqtt_publish::simple_mqtt::SimpleMqtt;
use mqtt_config::MqttConfig;
use rumqttc_wrapper::RumqttcWrapper;
use serde_derive::Deserialize;
use std::fs;
use std::thread;
Expand Down Expand Up @@ -43,12 +45,12 @@ fn main() {
let mut output_channels: Vec<Box<dyn MetricCollector>> = Vec::new();
if let Some(config) = config.home_assistant {
info!("Publishing to Home Assistent");
output_channels.push(Box::new(HomeAssistant::new(&config)));
output_channels.push(Box::new(HomeAssistant::<RumqttcWrapper>::new(&config)));
}

if let Some(config) = config.simple_mqtt {
info!("Publishing to simple MQTT broker");
output_channels.push(Box::new(SimpleMqtt::new(&config)));
output_channels.push(Box::new(SimpleMqtt::<RumqttcWrapper>::new(&config)));
}

loop {
Expand Down
26 changes: 26 additions & 0 deletions src/mqtt_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::mqtt_config::MqttConfig;

#[derive(Clone, Copy)]
pub enum QoS {
AtMostOnce,
AtLeastOnce,
ExactlyOnce,
}

// TODO: add an implementation of the MqttWrapper for testing
// TODO: should this be renamed to MqttImplementation?
pub trait MqttWrapper {
// This trait provides an interface that the decouples library code from an
// implementation of the MQTT client. On library calling code, one needs to
// wrap the MQTT implementation, i.e. the client, in a new type that in
// turn implements this trait.

fn subscribe(&mut self, topic: &str, qos: QoS) -> anyhow::Result<()>;

fn publish<S, V>(&mut self, topic: S, qos: QoS, retain: bool, payload: V) -> anyhow::Result<()>
where
S: Clone + Into<String>,
V: Clone + Into<Vec<u8>>;

fn new(config: &MqttConfig) -> Self;
}
88 changes: 88 additions & 0 deletions src/rumqttc_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use anyhow::Ok;
use hms_mqtt_publish::{
mqtt_config::MqttConfig,
mqtt_wrapper::{self},
};
use rumqttc::{Client, MqttOptions};
use std::thread;
use std::time::Duration;

pub struct RumqttcWrapper {
client: Client,
}

fn match_qos(qos: mqtt_wrapper::QoS) -> rumqttc::QoS {
match qos {
mqtt_wrapper::QoS::AtMostOnce => rumqttc::QoS::AtMostOnce,
mqtt_wrapper::QoS::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
mqtt_wrapper::QoS::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
}
}

impl mqtt_wrapper::MqttWrapper for RumqttcWrapper {
fn subscribe(&mut self, topic: &str, qos: mqtt_wrapper::QoS) -> anyhow::Result<()> {
Ok(self.client.subscribe(topic, match_qos(qos))?)
}

fn publish<S, V>(
&mut self,
topic: S,
qos: mqtt_wrapper::QoS,
retain: bool,
payload: V,
) -> anyhow::Result<()>
where
S: Clone + Into<String>,
V: Clone + Into<Vec<u8>>,
{
// try publishing up to three times
if let std::result::Result::Ok(_) =
self.client
.try_publish(topic.clone(), match_qos(qos), retain, payload.clone())
{
return Ok(());
}
std::thread::sleep(Duration::from_millis(100));
if let std::result::Result::Ok(_) =
self.client
.try_publish(topic.clone(), match_qos(qos), retain, payload.clone())
{
return Ok(());
}
std::thread::sleep(Duration::from_millis(100));
Ok(self
.client
.try_publish(topic, match_qos(qos), retain, payload)?)
}

fn new(config: &MqttConfig) -> Self {
let mut mqttoptions = MqttOptions::new(
"hms800wt2-mqtt-publisher",
&config.host,
config.port.unwrap_or(1883),
);
mqttoptions.set_keep_alive(Duration::from_secs(5));

//parse the mqtt authentication options
if let Some((username, password)) = match (&config.username, &config.password) {
(None, None) => None,
(None, Some(_)) => None,
(Some(username), None) => Some((username.clone(), "".into())),
(Some(username), Some(password)) => Some((username.clone(), password.clone())),
} {
mqttoptions.set_credentials(username, password);
}

let (client, mut connection) = Client::new(mqttoptions, 10);

thread::spawn(move || {
// keep polling the event loop to make sure outgoing messages get sent
// the call to .iter() blocks and suspends the thread effectively by
// calling .recv() under the hood. This implies that the loop terminates
// once the client unsubs
for _ in connection.iter() {}
});

Self { client }
}
}
44 changes: 9 additions & 35 deletions src/simple_mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,27 @@
use std::{thread, time::Duration};

use crate::{
metric_collector::MetricCollector, mqtt_config::MqttConfig,
metric_collector::MetricCollector,
mqtt_config::MqttConfig,
mqtt_wrapper::{MqttWrapper, QoS},
protos::hoymiles::RealData::HMSStateResponse,
};

use log::{debug, warn};
use rumqttc::{Client, MqttOptions, QoS};

pub struct SimpleMqtt {
client: Client,
pub struct SimpleMqtt<MQTT: MqttWrapper> {
client: MQTT,
}

impl SimpleMqtt {
impl<MQTT: MqttWrapper> SimpleMqtt<MQTT> {
pub fn new(config: &MqttConfig) -> Self {
let mut mqttoptions = MqttOptions::new(
"hms800w2t-mqtt-publisher",
&config.host,
config.port.unwrap_or(1883),
);
mqttoptions.set_keep_alive(Duration::from_secs(5));

//parse the mqtt authentication options
if let Some((username, password)) = match (&config.username, &config.password) {
(None, None) => None,
(None, Some(_)) => None,
(Some(username), None) => Some((username.clone(), "".into())),
(Some(username), Some(password)) => Some((username.clone(), password.clone())),
} {
mqttoptions.set_credentials(username, password);
}

let (client, mut connection) = Client::new(mqttoptions, 10);

thread::spawn(move || {
// keep polling the event loop to make sure outgoing messages get sent
for _ in connection.iter() {}
});

let client = MQTT::new(config);
Self { client }
}
}

impl MetricCollector for SimpleMqtt {
impl<MQTT: MqttWrapper> MetricCollector for SimpleMqtt<MQTT> {
fn publish(&mut self, hms_state: &HMSStateResponse) {
debug!("{hms_state}");

// self.client.subscribe("hms800wt2", QoS::AtMostOnce).unwrap();

let pv_current_power = hms_state.pv_current_power as f32 / 10.;
let pv_daily_yield = hms_state.pv_daily_yield;
let pv_grid_voltage = hms_state.inverter_state[0].grid_voltage as f32 / 10.;
Expand Down Expand Up @@ -98,7 +72,7 @@ impl MetricCollector for SimpleMqtt {
.into_iter()
.for_each(|(topic, payload)| {
if let Err(e) = self.client.publish(topic, QoS::AtMostOnce, true, payload) {
warn!("mqtt error: {e}")
warn!("mqtt error: {e:?}")
}
});
}
Expand Down

0 comments on commit b233e54

Please sign in to comment.