Skip to content

Commit

Permalink
Merge branch 'DennisOSRM:main' into feature/dockerbuild
Browse files Browse the repository at this point in the history
  • Loading branch information
InScene authored Nov 5, 2023
2 parents b93be8a + 6c6421f commit d74fec5
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 179 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# hms-mqtt-publisher

This tool fetches the current telemetry information from the HMS-XXXXW-T2 series of micro-inverters and publishes the information in JSON format to an MQTT broker. Please note that it doesn’t implement a DTU, but pulls the information off the internal DTU of these inverters.
This tool fetches the current telemetry information from the HMS-XXXXW-T2 series of micro-inverters and publishes the information into an MQTT broker. Please note that it doesn’t implement a DTU, but pulls the information off the internal DTU of these inverters.

### How to run
The tool is distributed as source only — for now. You’ll have to download, compile and run it yourself.
Expand Down
10 changes: 5 additions & 5 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = std::env::var("OUT_DIR")?;

Codegen::new()
.pure()
.cargo_out_dir("hoymiles")
.inputs(proto_files)
.include("src/protos")
.run_from_script();
.pure()
.cargo_out_dir("hoymiles")
.inputs(proto_files)
.include("src/protos")
.run_from_script();

std::fs::File::create(out_dir + "/mod.rs")?.write_all(MOD_RS)?;

Expand Down
103 changes: 103 additions & 0 deletions src/inverter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::protos::hoymiles::RealData::HMSStateResponse;
use crate::RealData::RealDataResDTO;
use crc16::*;
use log::{debug, info};
use protobuf::Message;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;

static INVERTER_PORT: u16 = 10081;

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum NetworkState {
Unknown,
Online,
Offline,
}

pub struct Inverter<'a> {
host: &'a str,
state: NetworkState,
sequence: u16,
}

impl<'a> Inverter<'a> {
pub fn new(host: &'a str) -> Self {
Self {
host,
state: NetworkState::Unknown,
sequence: 0_u16,
}
}

fn set_state(&mut self, new_state: NetworkState) {
if self.state != new_state {
self.state = new_state;
info!("Inverter is {new_state:?}");
}
}

pub fn update_state(&mut self) -> Option<HMSStateResponse> {
self.sequence = self.sequence.wrapping_add(1);

let /*mut*/ request = RealDataResDTO::default();
// let date = Local::now();
// let time_string = date.format("%Y-%m-%d %H:%M:%S").to_string();
// request.ymd_hms = time_string;
// request.cp = 23 + sequence as i32;
// request.offset = 0;
// request.time = epoch();
let header = b"\x48\x4d\xa3\x03";
let request_as_bytes = request.write_to_bytes().expect("serialize to bytes");
let crc16 = State::<MODBUS>::calculate(&request_as_bytes);
let len = request_as_bytes.len() as u16 + 10u16;

// compose request message
let mut message = Vec::new();
message.extend_from_slice(header);
message.extend_from_slice(&self.sequence.to_be_bytes());
message.extend_from_slice(&crc16.to_be_bytes());
message.extend_from_slice(&len.to_be_bytes());
message.extend_from_slice(&request_as_bytes);

let ip = self.host.parse().expect("Unable to parse socket address");
let address = SocketAddr::new(IpAddr::V4(ip), INVERTER_PORT);
let stream = TcpStream::connect_timeout(&address, Duration::from_millis(500));
if let Err(e) = stream {
debug!("{e}");
self.set_state(NetworkState::Offline);
return None;
}

let mut stream = stream.unwrap();
if let Err(e) = stream.write(&message) {
debug!(r#"{e}"#);
self.set_state(NetworkState::Offline);
return None;
}

let mut buf = [0u8; 1024];
let read = stream.read(&mut buf);

if let Err(e) = read {
debug!("{e}");
self.set_state(NetworkState::Offline);
return None;
}
let read_length = read.unwrap();
let parsed = HMSStateResponse::parse_from_bytes(&buf[10..read_length]);

if let Err(e) = parsed {
debug!("{e}");
self.set_state(NetworkState::Offline);
return None;
}
debug_assert!(parsed.is_ok());

let response = parsed.unwrap();
self.set_state(NetworkState::Online);
Some(response)
}
}
193 changes: 20 additions & 173 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
// TODO: support CA33 command to take over metrics consumption
// TODO: support publishing to S-Miles cloud, too

mod inverter;
mod mqtt;
mod protos;

use crate::protos::hoymiles::RealData::HMSStateResponse;
use crate::RealData::RealDataResDTO;
use crate::inverter::Inverter;
use crate::mqtt::{MetricCollector, Mqtt};

use std::error::Error;
use std::net::{IpAddr, SocketAddr};
use std::io::Write;
use std::thread;
use std::time::Duration;
use std::{fmt, thread};
use std::{
io::{Read, Write},
net::TcpStream,
};

use chrono::Local;
use clap::Parser;
use crc16::*;
use env_logger::Builder;
use log::{debug, info, LevelFilter, warn};
use protobuf::Message;
use log::{info, LevelFilter};
use protos::hoymiles::RealData;
use rumqttc::{Client, MqttOptions, QoS};

#[derive(Parser)]
struct Cli {
Expand All @@ -32,132 +26,8 @@ struct Cli {
mqtt_password: Option<String>,
}

static INVERTER_PORT: u16 = 10081;
static REQUEST_DELAY: u64 = 30_500;

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum InverterState {
Online,
Offline,
}

#[derive(Debug)]
enum ErrorState {
NetworkRead,
Offline,
ParseResponse,
Unknown,
}

impl Error for ErrorState {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}

fn description(&self) -> &str {
"description() is deprecated; use Display"
}

fn cause(&self) -> Option<&dyn Error> {
self.source()
}
}

impl fmt::Display for ErrorState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let message = match self {
ErrorState::NetworkRead => "network read failed",
ErrorState::Offline => "host not reachable",
ErrorState::ParseResponse => "response not parseable",
ErrorState::Unknown => "unknown",
};
write!(f, "ErrorState: {message}")
}
}

fn get_inverter_state(sequence: u16, host: &str) -> Result<HMSStateResponse, ErrorState> {
let /*mut*/ request = RealDataResDTO::default();
// let date = Local::now();
// let time_string = date.format("%Y-%m-%d %H:%M:%S").to_string();
// request.ymd_hms = time_string;
// request.cp = 23 + sequence as i32;
// request.offset = 0;
// request.time = epoch();
let header = b"\x48\x4d\xa3\x03";
let request_as_bytes = request.write_to_bytes().expect("serialize to bytes");
let crc16 = State::<MODBUS>::calculate(&request_as_bytes);
let len = request_as_bytes.len() as u16 + 10u16;

// compose request message
let mut message = Vec::new();
message.extend_from_slice(header);
message.extend_from_slice(&sequence.to_be_bytes());
message.extend_from_slice(&crc16.to_be_bytes());
message.extend_from_slice(&len.to_be_bytes());
message.extend_from_slice(&request_as_bytes);

let ip = host.parse().expect("Unable to parse socket address");
let address = SocketAddr::new(IpAddr::V4(ip), INVERTER_PORT);
let stream = TcpStream::connect_timeout(&address, Duration::from_millis(500));
if let Err(e) = stream {
debug!("{e}");
return Err(ErrorState::Offline);
}

let mut stream = stream.unwrap();
if let Err(e) = stream.write(&message) {
debug!(r#"{e}"#);
return Err(ErrorState::Unknown);
}

let mut buf = [0u8; 1024];
let read = stream.read(&mut buf);

if let Err(e) = read {
debug!("{e}");
return Err(ErrorState::NetworkRead);
}
let read_length = read.unwrap();
let parsed = HMSStateResponse::parse_from_bytes(&buf[10..read_length]);

if let Err(e) = parsed {
debug!("{e}");
return Err(ErrorState::ParseResponse);
}

let response = parsed.unwrap();
Ok(response)
}

fn send_to_mqtt(hms_state: &HMSStateResponse, client: &mut Client) {
debug!("{hms_state}");

let pv_current_power = hms_state.pv_current_power as f32 / 10.;
let pv_daily_yield = hms_state.pv_daily_yield;

client
.subscribe("hms800wt2/pv_current_power", QoS::AtMostOnce)
.unwrap();
match client.publish(
"hms800wt2/pv_current_power",
QoS::AtMostOnce,
true,
pv_current_power.to_string(),
) {
Ok(_) => {}
Err(e) => warn!("mqtt error: {e}"),
}
match client.publish(
"hms800wt2/pv_daily_yield",
QoS::AtMostOnce,
true,
pv_daily_yield.to_string(),
) {
Ok(_) => {}
Err(e) => warn!("mqtt error: {e}"),
}
}

fn main() {
Builder::new()
.format(|buf, record| {
Expand All @@ -175,48 +45,25 @@ fn main() {
let cli = Cli::parse();

// set up mqtt connection
info!("inverter: {}, mqtt broker {}", cli.inverter_host, cli.mqtt_broker_host);
let mut mqttoptions = MqttOptions::new("hms800wt2-mqtt-publisher", cli.mqtt_broker_host, 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
info!(
"inverter: {}, mqtt broker {}",
cli.inverter_host, cli.mqtt_broker_host
);

//parse the mqtt authentication options
if let Some((username, password)) = match (cli.mqtt_username, cli.mqtt_password) {
(None, None) => None,
(None, Some(_)) => None,
(Some(username), None) => Some((username.clone(), "".into())),
(Some(username), Some(password)) => Some((username.clone(), password.clone())),
} {
mqttoptions.set_credentials(username, password);
}
let mut inverter = Inverter::new(&cli.inverter_host);

let (mut client, mut connection) = Client::new(mqttoptions, 10);
thread::spawn(move || {
// keep polling the event loop to make sure outgoing messages get sent
for _ in connection.iter() {}
});
let mut mqtt = Mqtt::new(
&cli.mqtt_broker_host,
&cli.mqtt_username,
&cli.mqtt_password,
);

let mut sequence = 0u16;
let mut current_state = InverterState::Offline;
loop {
sequence = sequence.wrapping_add(1);
// factor out a function that returns a (Response, State);
let new_state = match get_inverter_state(sequence, &cli.inverter_host) {
Ok(r) => {
debug!("{r}");
send_to_mqtt(&r, &mut client);
InverterState::Online
}
Err(e) => {
debug!("error: {e}");
InverterState::Offline
}
};

if current_state != new_state {
current_state = new_state;
info!("Inverter is {current_state:?}");
if let Some(r) = inverter.update_state() {
mqtt.publish(&r);
}

// TODO: this has to move into the Inverter struct in an async implementation
thread::sleep(Duration::from_millis(REQUEST_DELAY));
}
}
Loading

0 comments on commit d74fec5

Please sign in to comment.