Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin_orig/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominik Andreas committed Dec 9, 2023
2 parents 442030c + 1f2ab22 commit 051b7ed
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 35 deletions.
13 changes: 13 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::process::Command;
fn main() {
// runs git describe --always --dirty
if let Ok(output) = Command::new("git")
.args(["describe", "--always", "--dirty"])
.output()
{
let git_hash = String::from_utf8(output.stdout).unwrap();
println!("cargo:rustc-env=GIT_HASH={git_hash}");
} else {
println!("cargo:rustc-env=GIT_HASH=UNKNOWN");
}
}
2 changes: 1 addition & 1 deletion hms2mqtt/src/home_assistant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct HomeAssistant<MQTT: MqttWrapper> {

impl<MQTT: MqttWrapper> HomeAssistant<MQTT> {
pub fn new(config: &MqttConfig) -> Self {
let client = MQTT::new(config);
let client = MQTT::new(config, "-ha");
Self { client }
}

Expand Down
32 changes: 24 additions & 8 deletions hms2mqtt/src/inverter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::protos::hoymiles::RealData::{HMSStateResponse, RealDataResDTO};
use crc16::{State, MODBUS};
use log::{debug, info};
use log::{debug, error, info, warn};
use protobuf::Message;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::net::{IpAddr, SocketAddr};
use std::net::{TcpStream, ToSocketAddrs};
use std::time::Duration;

static INVERTER_PORT: u16 = 10081;
static INVERTER_PORT: &str = "10081";

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum NetworkState {
Expand Down Expand Up @@ -61,16 +60,33 @@ impl<'a> Inverter<'a> {
message.extend_from_slice(&len.to_be_bytes());
message.extend_from_slice(&request_as_bytes);

let ip = self.host.parse().expect("Unable to parse socket address");
let address = SocketAddr::new(IpAddr::V4(ip), INVERTER_PORT);
let stream = TcpStream::connect_timeout(&address, Duration::from_millis(500));
let inverter_host = self.host.to_string() + ":" + INVERTER_PORT;
let address = match inverter_host.to_socket_addrs() {
Ok(mut a) => a.next(),
Err(e) => {
error!("Unable to resolve domain: {e}");
return None;
}
};
if address.is_none() {
error!("Unable to parse name");
return None;
}

let stream = TcpStream::connect_timeout(&address.unwrap(), Duration::from_millis(500));
if let Err(e) = stream {
debug!("{e}");
debug!("could not connect: {e}");
self.set_state(NetworkState::Offline);
return None;
}

let mut stream = stream.unwrap();
if let Err(e) = stream.set_write_timeout(Some(Duration::new(5, 0))) {
warn!("could not set write timeout: {e}");
}
if let Err(e) = stream.set_read_timeout(Some(Duration::new(5, 0))) {
warn!("could not set read timeout: {e}");
}
if let Err(e) = stream.write(&message) {
debug!(r#"{e}"#);
self.set_state(NetworkState::Offline);
Expand Down
2 changes: 1 addition & 1 deletion hms2mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub mod simple_mqtt;

// internal interfaces
mod home_assistant_config;
mod protos;
mod protos;
2 changes: 1 addition & 1 deletion hms2mqtt/src/mqtt_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ pub trait MqttWrapper {
S: Clone + Into<String>,
V: Clone + Into<Vec<u8>>;

fn new(config: &MqttConfig) -> Self;
fn new(config: &MqttConfig, suffix: &str) -> Self;
}
2 changes: 1 addition & 1 deletion hms2mqtt/src/simple_mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct SimpleMqtt<MQTT: MqttWrapper> {

impl<MQTT: MqttWrapper> SimpleMqtt<MQTT> {
pub fn new(config: &MqttConfig) -> Self {
let client = MQTT::new(config);
let client = MQTT::new(config, "-sm");
Self { client }
}
}
Expand Down
19 changes: 2 additions & 17 deletions src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,5 @@
use std::io::Write;

use chrono::Local;
use env_logger::Builder;
use log::LevelFilter;
use env_logger::{Builder, Env};

pub fn init_logger() {
Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{} [{}] - {}",
Local::now().format("%Y-%m-%dT%H:%M:%S"),
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Info)
.init();
Builder::from_env(Env::default().default_filter_or("info")).init();
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static REQUEST_DELAY: u64 = 30_500;

fn main() {
logging::init_logger();

info!("Running revision: {}", env!("GIT_HASH"));
if std::env::args().len() > 1 {
error!("Arguments passed. Tool is configured by config.toml in its path");
}
Expand Down
13 changes: 8 additions & 5 deletions src/rumqttc_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use hms2mqtt::{
mqtt_config::MqttConfig,
mqtt_wrapper::{self},
};
use rumqttc::{Client, MqttOptions};
use log::warn;
use rumqttc::{Client, MqttOptions, QoS::AtMostOnce};

pub struct RumqttcWrapper {
client: Client,
Expand Down Expand Up @@ -56,9 +57,9 @@ impl mqtt_wrapper::MqttWrapper for RumqttcWrapper {
.try_publish(topic, match_qos(qos), retain, payload)?)
}

fn new(config: &MqttConfig) -> Self {
fn new(config: &MqttConfig, suffix: &str) -> Self {
let mut mqttoptions = MqttOptions::new(
"hms800wt2-mqtt-publisher",
"hms800wt2-mqtt-publisher".to_string() + suffix,
&config.host,
config.port.unwrap_or(1883),
);
Expand All @@ -74,7 +75,7 @@ impl mqtt_wrapper::MqttWrapper for RumqttcWrapper {
mqttoptions.set_credentials(username, password);
}

let (client, mut connection) = Client::new(mqttoptions, 10);
let (mut client, mut connection) = Client::new(mqttoptions, 512);

thread::spawn(move || {
// keep polling the event loop to make sure outgoing messages get sent
Expand All @@ -83,7 +84,9 @@ impl mqtt_wrapper::MqttWrapper for RumqttcWrapper {
// once the client unsubs
for _ in connection.iter() {}
});

if let Err(e) = client.subscribe("hms800wt2", AtMostOnce) {
warn!("subscription to base topic failed: {e}");
}
Self { client }
}
}
65 changes: 65 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use hms2mqtt::{mqtt_config::MqttConfig, mqtt_wrapper::MqttWrapper};

struct MqttTester {
published_values: Vec<(String, Vec<u8>)>,
}

impl MqttTester {
pub fn len(&self) -> usize {
self.published_values.len()
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl MqttWrapper for MqttTester {
fn subscribe(&mut self, _topic: &str, _qos: hms2mqtt::mqtt_wrapper::QoS) -> anyhow::Result<()> {
Ok(())
}

fn publish<S, V>(
&mut self,
topic: S,
_qos: hms2mqtt::mqtt_wrapper::QoS,
_retain: bool,
payload: V,
) -> anyhow::Result<()>
where
S: Into<String>,
V: Into<Vec<u8>>,
{
self.published_values.push((topic.into(), payload.into()));
Ok(())
}

fn new(_config: &hms2mqtt::mqtt_config::MqttConfig, _suffix: &str) -> Self {
Self {
published_values: Vec::new(),
}
}
}

#[test]
fn publish_one_message() {
let mut mqtt = MqttTester::new(
&MqttConfig {
host: "frob".to_owned(),
port: Some(1234),
username: None,
password: None,
},
"-test",
);
let result = mqtt.publish(
"foo",
hms2mqtt::mqtt_wrapper::QoS::AtMostOnce,
true,
"Hooray".to_string(),
);
assert!(result.is_ok());
assert!(!mqtt.is_empty());
assert_eq!(mqtt.len(), 1);
}

0 comments on commit 051b7ed

Please sign in to comment.