Skip to content

Commit

Permalink
feat: Sync device values from MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
chicoferreira committed Sep 20, 2023
1 parent 86b50f4 commit 50a82cb
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 52 deletions.
66 changes: 54 additions & 12 deletions homekit-mqtt-bridge/src/device.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};
use std::str::FromStr;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

use async_trait::async_trait;
use hap::accessory::HapAccessory;
use hap::characteristic::AsyncCharacteristicCallbacks;
use hap::characteristic::brightness::BrightnessCharacteristic;
use hap::characteristic::power_state::PowerStateCharacteristic;
use hap::futures::FutureExt;
use log::warn;
use paho_mqtt::Message;

use crate::mqtt::MqttWrapper;

pub mod yeelight_device;

struct InnerDevice<T, H> {
name: String,
device: T,
pub struct InnerDevice<T, H> {
pub name: String,
pub device: T,
h: PhantomData<H>,
}

Expand All @@ -40,16 +44,12 @@ impl<D, H> Device<D, H> {
}
}

pub fn name(&self) -> &str {
&self.inner.read().unwrap().name
pub fn get_inner(&self) -> RwLockReadGuard<'_, InnerDevice<D, H>> {
self.inner.read().unwrap()
}

pub fn get_device(&self) -> &D {
&self.inner.read().unwrap().device
}

pub fn get_mut_device(&mut self) -> &mut D {
&mut self.inner.write().unwrap().device
pub fn get_inner_mut(&self) -> RwLockWriteGuard<'_, InnerDevice<D, H>> {
self.inner.write().unwrap()
}

pub async fn characteristic<A>(&self, mqtt_client: MqttWrapper) -> anyhow::Result<A>
Expand All @@ -65,6 +65,32 @@ impl<D, H> Device<D, H> {
{
self.set_value(value, mqtt_client);
}

pub async fn handle_message<A>(&mut self, message: Message, accessory: HapRsAccessory) -> Result<(), &'static str>
where
Self: Characteristic<A>,
{
self.handle_mqtt_message(message, accessory).await
}
}

impl<D: Send + Sync + 'static, H: Send + Sync + 'static> Device<D, H> {
fn setup_pointer<A>(self, topic: &str, mqtt_client: &mut MqttWrapper, lightbulb: HapRsAccessory)
where
Self: Characteristic<A>, {
mqtt_client.subscribe(
topic,
Box::new(move |message: Message| {
let mut self_clone = self.clone();
let lightbulb = lightbulb.clone();
Box::pin(async move {
if let Err(str) = self_clone.handle_message::<A>(message, lightbulb).await {
warn!("Error handling message: {}", str);
}
})
}),
);
}
}

impl<T, H> Device<T, H>
Expand Down Expand Up @@ -146,9 +172,11 @@ impl<T, H> Device<T, H>
}
}

#[async_trait]
pub trait Characteristic<T> {
fn get_value(&self, mqtt_client: MqttWrapper) -> anyhow::Result<T>;
fn set_value(&mut self, value: T, mqtt_client: MqttWrapper);
async fn handle_mqtt_message(&mut self, message: Message, accessory: HapRsAccessory) -> Result<(), &'static str>;
}

#[derive(Clone, Debug)]
Expand All @@ -157,6 +185,18 @@ pub struct Brightness(pub u8);
#[derive(Clone, Debug)]
pub struct Power(pub bool);

impl FromStr for Power {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"on" | "true" | "1" => Ok(Power(true)),
"off" | "false" | "0" => Ok(Power(false)),
_ => Err("Could not parse power state"),
}
}
}

impl ToString for Power {
fn to_string(&self) -> String {
match self.0 {
Expand All @@ -171,3 +211,5 @@ impl ToString for Brightness {
self.0.to_string()
}
}

type HapRsAccessory = Arc<hap::futures::lock::Mutex<Box<dyn HapAccessory>>>;
78 changes: 53 additions & 25 deletions homekit-mqtt-bridge/src/device/yeelight_device.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use hap::accessory::{AccessoryInformation, HapAccessory};
use std::str::FromStr;

use async_trait::async_trait;
use hap::accessory::AccessoryInformation;
use hap::accessory::lightbulb::LightbulbAccessory;
use hap::futures::lock::Mutex;
use hap::HapType;
use hap::server::{IpServer, Server};
use paho_mqtt::Message;

use crate::device::{Brightness, Characteristic, Device, Power};
use crate::device::{Brightness, Characteristic, Device, HapRsAccessory, Power};
use crate::mqtt::MqttWrapper;

pub struct YeelightLightbulb {
Expand All @@ -22,53 +25,78 @@ impl YeelightDevice {
})
}

pub fn setup(&mut self, id: u64, mqtt_client: &mut MqttWrapper) -> LightbulbAccessory {
pub async fn setup(&mut self, id: u64, mqtt_client: &mut MqttWrapper, ip_server: &IpServer) {
let mut lightbulb = LightbulbAccessory::new(id, AccessoryInformation {
name: self.name().to_string(),
name: self.get_inner().name.to_string(),
..Default::default()
}).expect("The lightbulb accessory should be created successfully.");

self.setup_power(mqtt_client, &mut lightbulb.lightbulb.power_state);
self.setup_brightness(mqtt_client, lightbulb.lightbulb.brightness.as_mut().expect("The brightness characteristic should be created successfully."));

lightbulb
}

pub fn setup_pointer(&self, mqtt_client: &mut MqttWrapper, lightbulb: std::sync::Arc<Mutex<Box<dyn HapAccessory>>>) {
mqtt_client.subscribe("smart-home-system/yeelight/brightness", Box::new(|message: &Message| Box::pin(async move {
let payload = message.payload_str();
let brightness = Brightness(payload.parse::<u8>().expect("The payload should be a u8."));
let accessory = ip_server.add_accessory(lightbulb).await.expect("The lightbulb accessory should be added successfully.");

let mut motion_sensor_accessory = lightbulb.lock().await;
let motion_sensor_service = motion_sensor_accessory.get_mut_service(HapType::MotionSensor).unwrap();
let motion_detected_characteristic = motion_sensor_service
.get_mut_characteristic(HapType::MotionDetected)
.unwrap();

motion_detected_characteristic.set_value(brightness.0.into()).await;
})));
self.clone().setup_pointer::<Brightness>("smart-home-system/yeelight/brightness", mqtt_client, accessory.clone());
self.clone().setup_pointer::<Power>("smart-home-system/yeelight/power", mqtt_client, accessory.clone());
}

}

#[async_trait]
impl Characteristic<Brightness> for YeelightDevice {
fn get_value(&self, _mqtt_client: MqttWrapper) -> anyhow::Result<Brightness> {
Ok(self.get_device().brightness.clone())
Ok(self.get_inner().device.brightness.clone())
}

fn set_value(&mut self, value: Brightness, mut mqtt_client: MqttWrapper) {
self.get_mut_device().brightness = value.clone();
self.get_inner_mut().device.brightness = value.clone();
mqtt_client.publish("smart-home-system/yeelight/brightness/set", value.to_string())
}

async fn handle_mqtt_message(&mut self, message: Message, accessory: HapRsAccessory) -> Result<(), &'static str> {
let payload = message.payload_str();
let brightness = Brightness(payload.parse::<u8>().map_err(|_| "Could not parse brightness")?);

let mut lightbulb = accessory.lock().await;
let lightbulb_service = lightbulb.get_mut_service(HapType::Lightbulb)
.expect("The lightbulb service should be created successfully.");

let brightness_characteristic = lightbulb_service
.get_mut_characteristic(HapType::Brightness)
.unwrap();

self.get_inner_mut().device.brightness = brightness.clone();
brightness_characteristic.set_value(brightness.0.into()).await.expect("TODO: panic message");

Ok(())
}
}

#[async_trait]
impl Characteristic<Power> for YeelightDevice {
fn get_value(&self, _mqtt_client: MqttWrapper) -> anyhow::Result<Power> {
Ok(self.get_device().power_state.clone())
Ok(self.get_inner().device.power_state.clone())
}

fn set_value(&mut self, value: Power, mut mqtt_client: MqttWrapper) {
self.get_mut_device().power_state = value.clone();
self.get_inner_mut().device.power_state = value.clone();
mqtt_client.publish("smart-home-system/yeelight/power/set", value.to_string());
}

async fn handle_mqtt_message(&mut self, message: Message, accessory: HapRsAccessory) -> Result<(), &'static str> {
let payload = message.payload_str();
let power = Power::from_str(&payload)?;

let mut lightbulb = accessory.lock().await;
let lightbulb_service = lightbulb.get_mut_service(HapType::Lightbulb)
.expect("The lightbulb service should be created successfully.");

let power_characteristic = lightbulb_service
.get_mut_characteristic(HapType::PowerState)
.expect("The power characteristic should be created successfully.");

self.get_inner_mut().device.power_state = power.clone();
power_characteristic.set_value(power.0.into()).await.expect("TODO: panic message");

Ok(())
}
}
16 changes: 4 additions & 12 deletions homekit-mqtt-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,8 @@ async fn main() -> Result<()> {
client.connect(connection_options).await
.expect("Failed to connect to mqtt server");

let mqtt_thread = tokio::spawn(async move {
// for &topic in subscribe_topics.iter() {
// client.subscribe(topic, 1).await
// .unwrap_or_else(|_| panic!("Failed to subscribe to topic: {}", topic));
// }
});
let mut mqtt_wrapper = MqttWrapper::new(client);
let mqtt_read_handle = mqtt_wrapper.start_reading();

let bridge = BridgeAccessory::new(1, AccessoryInformation {
name: "smart-home-system bridge".into(),
Expand All @@ -81,12 +77,8 @@ async fn main() -> Result<()> {
let server = IpServer::new(config, storage).await?;
server.add_accessory(bridge).await?;

let mut mqtt_wrapper = MqttWrapper::new(client);

let mut device = device::yeelight_device::YeelightDevice::new("yeelight".into());
let arc = server.add_accessory(device.setup(2, &mut mqtt_wrapper)).await?;

device.setup_pointer(&mut mqtt_wrapper, arc.clone());
device.setup(2, &mut mqtt_wrapper, &server).await;

std::env::set_var("RUST_LOG", "hap=debug");
env_logger::init();
Expand All @@ -96,7 +88,7 @@ async fn main() -> Result<()> {
handle.await.expect("TODO: panic message");
});

join_all(vec![mqtt_thread, hap_rs_handle]).await;
join_all(vec![mqtt_read_handle, hap_rs_handle]).await;

Ok(())
}
6 changes: 3 additions & 3 deletions homekit-mqtt-bridge/src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use dashmap::DashMap;
use paho_mqtt::{AsyncClient, Message};
use tokio::task::JoinHandle;

type Callback = Box<dyn Fn(&Message) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
type Callback = Box<dyn Fn(Message) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;

#[derive(Clone)]
pub struct MqttWrapper {
Expand Down Expand Up @@ -39,7 +39,7 @@ impl MqttWrapper {
self.callbacks.insert(topic.clone(), callback);
}

fn start_reading(&self) -> JoinHandle<()> {
pub fn start_reading(&self) -> JoinHandle<()> {
let mut self_clone = self.clone();
tokio::spawn(async move {
let receiver = self_clone.client.get_stream(10);
Expand All @@ -55,7 +55,7 @@ impl MqttWrapper {
let topic = message.topic();

if let Some(sender) = self.callbacks.get(topic) {
sender(&message).await;
sender(message).await;
}
}
}

0 comments on commit 50a82cb

Please sign in to comment.