Skip to content

Commit

Permalink
Merge pull request #38 from openenergymonitor/emon-pi-develop
Browse files Browse the repository at this point in the history
Emon pi develop
  • Loading branch information
glynhudson authored Aug 16, 2017
2 parents e97ee59 + 48abb04 commit dda5227
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 171 deletions.
53 changes: 47 additions & 6 deletions src/emonhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import signal
import argparse
import pprint

try:
import pymodbus
pymodbus_found = True
Expand Down Expand Up @@ -85,7 +86,7 @@

class EmonHub(object):

__version__ = "emonHub 'emon-pi' variant v1.2"
__version__ = "emonHub emon-pi variant v2.0.0"

def __init__(self, setup):
"""Setup an OpenEnergyMonitor emonHub.
Expand Down Expand Up @@ -124,6 +125,11 @@ def run(self):
# Set signal handler to catch SIGINT and shutdown gracefully
signal.signal(signal.SIGINT, self._sigint_handler)

# Initialise thread restart counters
restart_count={}
for I in self._interfacers.itervalues():
restart_count[I.name]=0

# Until asked to stop
while not self._exit:

Expand All @@ -133,12 +139,47 @@ def run(self):
self._update_settings(self._setup.settings)

# For all Interfacers
kill_list=[]
for I in self._interfacers.itervalues():
# Check thread is still running
# Check threads are still running
if not I.isAlive():
#I.start()
self._log.warning(I.name + " thread is dead") # had to be restarted")

kill_list.append(I.name) # <-avoid modification of iterable within loop

# Read each interfacers pub channels
for pub_channel in I._settings['pubchannels']:

if pub_channel in I._pub_channels:
if len(I._pub_channels[pub_channel])>0:

# POP cargo item (one at a time)
cargo = I._pub_channels[pub_channel].pop(0)

# Post to each subscriber interface
for sub_interfacer in self._interfacers.itervalues():
# For each subsciber channel
for sub_channel in sub_interfacer._settings['subchannels']:
# If channel names match
if sub_channel==pub_channel:
# init if empty
if not sub_channel in sub_interfacer._sub_channels:
sub_interfacer._sub_channels[sub_channel] = []

# APPEND cargo item
sub_interfacer._sub_channels[sub_channel].append(cargo)

# ->avoid modification of iterable within loop
for name in kill_list:
self._log.warning(name + " thread is dead.")

# The following should trigger a restart ... unless the
# interfacer is also removed from the settings table.
del(self._interfacers[name])

# Trigger restart by calling update settings
self._log.warning("Attempting to restart thread "+name+" (thread has been restarted "+str(restart_count[name])+" times...")
restart_count[name]+=1
self._update_settings(self._setup.settings)

# Sleep until next iteration
time.sleep(0.2)

Expand Down Expand Up @@ -208,7 +249,7 @@ def _update_settings(self, settings):
if I['Type'] in ('EmonModbusTcpInterfacer','EmonFroniusModbusTcpInterfacer') and not pymodbus_found :
self._log.error("Python module pymodbus not installed. unable to load modbus interfacer")
# This gets the class from the 'Type' string
interfacer = getattr(ehi, I['Type'])(name, **I['init_settings'])
interfacer = getattr(ehi, I['Type'])(name,**I['init_settings'])
interfacer.set(**I['runtimesettings'])
interfacer.init_settings = I['init_settings']
interfacer.start()
Expand Down
2 changes: 0 additions & 2 deletions src/interfacers/EmonHubBMWInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import requests
import os.path

from pydispatch import dispatcher

from emonhub_interfacer import EmonHubInterfacer

"""class EmonHubBMWInterfacer
Expand Down
80 changes: 50 additions & 30 deletions src/interfacers/EmonHubEmoncmsHTTPInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json
import urllib2
import httplib
from pydispatch import dispatcher
from emonhub_interfacer import EmonHubInterfacer

class EmonHubEmoncmsHTTPInterfacer(EmonHubInterfacer):
Expand All @@ -26,38 +25,64 @@ def __init__(self, name):
'sendinterval': 30
}

self.buffer = []
# Initialize message queue
self._pub_channels = {}
self._sub_channels = {}

self.lastsent = time.time()
self.lastsentstatus = time.time()

def receiver(self, cargo):

# Create a frame of data in "emonCMS format"
f = []
try:
f.append(float(cargo.timestamp))
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
except:
self._log.warning("Failed to create emonCMS frame " + str(f))

# Append to bulk post buffer
self.buffer.append(f)

def action(self):

now = time.time()

if (now-self.lastsent) > (int(self._settings['sendinterval'])):
self.lastsent = now
# print json.dumps(self.buffer)

if int(self._settings['senddata']):
self.bulkpost(self.buffer)
self.buffer = []
# It might be better here to combine the output from all sub channels
# into a single bulk post, most of the time there is only one sub channel
for channel in self._settings["subchannels"]:
if channel in self._sub_channels:

# only try to prepare and send data if there is any
if len(self._sub_channels[channel])>0:

bulkdata = []

for cargo in self._sub_channels[channel]:
# Create a frame of data in "emonCMS format"
f = []
try:
f.append(float(cargo.timestamp))
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
except:
self._log.warning("Failed to create emonCMS frame " + str(f))

bulkdata.append(f)

# Get the length of the data to be sent
bulkdata_length = len(bulkdata)
self._log.debug("Sending bulkdata, length: "+str(bulkdata_length))

# Attempt to send the data
success = self.bulkpost(bulkdata)

self._log.debug("Sending bulkdata, success: "+str(success))

# if bulk post is successful delete the range posted
if success:
for i in range(0,bulkdata_length):
self._sub_channels[channel].pop(0)
self._log.debug("Deleted sent data from queue")

self._log.debug("New queue length: "+str(len(self._sub_channels[channel])))


if (now-self.lastsentstatus)> (int(self._settings['sendinterval'])):
self.lastsentstatus = now
Expand All @@ -68,7 +93,7 @@ def bulkpost(self,databuffer):

if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \
or str.lower(str(self._settings['apikey'])) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx':
return
return False

data_string = json.dumps(databuffer, separators=(',', ':'))

Expand Down Expand Up @@ -100,6 +125,7 @@ def bulkpost(self,databuffer):
return True
else:
self._log.warning("send failure: wanted 'ok' but got '" +reply+ "'")
return False

def _send_post(self, post_url, post_body=None):
"""
Expand Down Expand Up @@ -157,9 +183,3 @@ def set(self, **kwargs):
if key in kwargs.keys():
# replace default
self._settings[key] = kwargs[key]

# Subscribe to internal channels
for channel in self._settings["subchannels"]:
dispatcher.connect(self.receiver, channel)
self._log.debug(self._name+" Subscribed to channel' : " + str(channel))

28 changes: 2 additions & 26 deletions src/interfacers/EmonHubJeeInterfacer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

import time
from pydispatch import dispatcher

import datetime
import Cargo
import EmonHubSerialInterfacer as ehi
Expand Down Expand Up @@ -49,6 +47,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0):
else:
self._log.warning("Device communication error - check settings")
self._rx_buf=""

self._ser.flushInput()

# Initialize settings
Expand All @@ -66,7 +65,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0):
# Pre-load Jee settings only if info string available for checks
if all(i in self.info[1] for i in (" i", " g", " @ ", " MHz")):
self._settings.update(self._jee_settings)

def read(self):
"""Read data from serial port and process if complete line received.
Expand Down Expand Up @@ -146,12 +145,6 @@ def read(self):

return c

# # unix timestamp
# t = round(time.time(), 2)
#
# # Process data frame
# self._r xq.put(self._process_rx(f, t))

def set(self, **kwargs):
"""Send configuration parameters to the "Jee" type device through COM port
Expand Down Expand Up @@ -221,26 +214,10 @@ def action(self):
self._ser.write("00,%02d,%02d,00,s" % (now.hour, now.minute))

def send (self, cargo):
"""
"""
#self._process_tx(self._txq.get())
#self._rxq.put( self._process_rx(f, t))
#dest = f[1]
#packet = f[2:-1]
#self.send_packet(packet, dest)
# TODO amalgamate into 1 send

#def send_packet(self, packet, id=0, cmd="s"):
"""

"""
f = cargo
cmd = "s"

# # If the use of acks gets implemented
# ack = False
# if ack:
# cmd = "a"
if self.getName() in f.encoded:
data = f.encoded[self.getName()]
else:
Expand All @@ -257,4 +234,3 @@ def send (self, cargo):

self._log.debug(str(f.uri) + " sent TX packet: " + payload)
self._ser.write(payload)

Loading

0 comments on commit dda5227

Please sign in to comment.