Skip to content

Commit

Permalink
MQTT refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
Hydrosys4 committed Aug 14, 2020
1 parent 0ad198b commit 794aaf2
Show file tree
Hide file tree
Showing 12 changed files with 807 additions and 168 deletions.
537 changes: 405 additions & 132 deletions MQTTcontrol.py

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions changelog/change
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,30 @@ Verified clock setting behavior, when the Internet connection is present the set

- Add option in Hydrosys4 installer to install the Mqtt local broker

2020-08-05 -> release 322d

- Added initMQTT in consistencyceck to reset the connections
- implemented stop/MQTT

2020-08-08 -> release 322e

- Make MQTT (for relays) commands compatible with Tasmota sw.

2020-08-10 -> release 323

- MQTT support for sensor reading

2020-08-10 -> release 323a

- MQTT support for sensor reading asynch reading management

2020-08-13 -> release 323d

- MQTT better implement the asynch management
- Add the MQTT devide list page with IP addresses



------- Future releases: -----------

NOTE:
Expand Down
67 changes: 59 additions & 8 deletions hardwaremod.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ def normalizesensordata(reading_str,sensorname):
def getsensordata(sensorname,attemptnumber): #needed
# this procedure was initially built to communicate using the serial interface with a module in charge of HW control (e.g. Arduino)
# To lower the costs, I used the PI hardware itself but I still like the way to communicate with the HWcontrol module that is now a SW module not hardware
isok=False
statusmessage=""

cmd=searchdata(HW_INFO_NAME,sensorname,HW_CTRL_CMD)
Thereading=""
if not cmd=="":
Expand All @@ -337,8 +340,15 @@ def getsensordata(sensorname,attemptnumber): #needed
arg4=str(searchdata(HW_INFO_NAME,sensorname,HW_CTRL_LOGIC))
arg5=str(searchdata(HW_INFO_NAME,sensorname,HW_CTRL_ADDR))
arg6=str(searchdata(HW_INFO_NAME,sensorname,HW_CTRL_PIN2))
arg7=str(searchdata(HW_INFO_NAME,sensorname,HW_CTRL_TITLE))

timestr=searchdata(HW_INFO_NAME,sensorname,HW_FUNC_TIME)
mastertimelist=separatetimestringint(timestr)
timeperiodsec=mastertimelist[0]*3600+mastertimelist[1]*60+mastertimelist[0]

sendstring=sensorname+":"+pin+":"+arg1+":"+arg2+":"+arg3+":"+arg4+":"+arg5+":"+arg6
arg8=str(timeperiodsec)

sendstring=sensorname+":"+pin+":"+arg1+":"+arg2+":"+arg3+":"+arg4+":"+arg5+":"+arg6+":"+arg7+":"+arg8
recdata=[]
ack=False
i=0
Expand All @@ -349,14 +359,18 @@ def getsensordata(sensorname,attemptnumber): #needed
if recdata[0]==cmd: # this was used to check the response and command consistency when serial comm was used
if recdata[2]>0: # this is the flag that indicates if the measurement is correct
#print " Sensor " , sensorname , "reading ",recdata[1]
isok=True

Thereading=normalizesensordata(recdata[1],sensorname) # output is a string

print(" Sensor " , sensorname , "Normalized reading ",Thereading)
logger.info("Sensor %s reading: %s", sensorname,Thereading)
if len(recdata)>3:
statusmessage=recdata[3]
else:
print("Problem with sensor reading ", sensorname)
logger.error("Problem with sensor reading: %s", sensorname)
statusmessage=recdata[1]
else:
print("Problem with response consistency ", sensorname , " cmd " , cmd)
logger.error("Problem with response consistency : %s", sensorname)
Expand All @@ -366,7 +380,7 @@ def getsensordata(sensorname,attemptnumber): #needed
else:
print("sensor name not found in list of sensors ", sensorname)
logger.error("sensor name not found in list of sensors : %s", sensorname)
return Thereading
return isok, Thereading, statusmessage

def makepulse(target,duration,addtime=True, priority=0): # pulse in seconds , addtime=True extend the pulse time with new time , addtime=False let the current pulse finish ,

Expand Down Expand Up @@ -459,10 +473,10 @@ def stoppulse(target):

if MIN and MAX:
# dual pulse setting
sendstring=stopcmd+":"+PIN+":"+"0"+":"+logic+":"+POWERPIN+":"+str(MIN) +":"+str(MAX)+":"+address+":"+title
sendstring=stopcmd+":"+PIN+":"+"0"+":"+logic+":"+POWERPIN+":"+str(MIN) +":"+str(MAX)+":"+target+":"+title
else:
# normal pulse
sendstring=stopcmd+"pulse:"+PIN+":"+"0"+":"+logic+":"+POWERPIN+":0"+":0"+":"+address+":"+title
sendstring=stopcmd+":"+PIN+":"+"0"+":"+logic+":"+POWERPIN+":0"+":0"+":"+target+":"+title

#print "logic " , logic , " sendstring " , sendstring

Expand Down Expand Up @@ -926,7 +940,7 @@ def readallsensors():
sensorlist=searchdatalist(HW_INFO_IOTYPE,"input",HW_INFO_NAME)
sensorvalues={}
for sensorname in sensorlist:
sensorvalues[sensorname]=getsensordata(sensorname,3)
isok, sensorvalues[sensorname], errmsg =getsensordata(sensorname,3)
return sensorvalues


Expand All @@ -938,7 +952,7 @@ def checkallsensors():
sensorlist=searchdatalist(HW_INFO_IOTYPE,"input",HW_INFO_NAME)
sensorvalues={}
for sensorname in sensorlist:
sensorvalues[sensorname]=getsensordata(sensorname,3)
isok, sensorvalues[sensorname], errmsg = getsensordata(sensorname,3)
return sensorvalues


Expand All @@ -950,6 +964,8 @@ def initallGPIOpins():


def initMQTT():

MQTTcontrol.Disconnect_clients()
# define the list of parameters for the initialization
MQTTitemlist=searchdatalistinstr(HW_CTRL_CMD,"/mqtt",HW_INFO_NAME)

Expand All @@ -960,10 +976,36 @@ def initMQTT():
client["username"]=""
client["password"]=""
client["pubtopic"]="" # this will be provided during operations
fulltopic=searchdata(HW_INFO_NAME,items,HW_CTRL_TITLE)
if searchdata(HW_INFO_NAME,items,HW_INFO_IOTYPE)=="output":
client["subtopic"]=searchdata(HW_INFO_NAME,items,HW_CTRL_TITLE)+"/#" #in this way it subscribe to the topic and all the sub levels topic
# read on "stat"
stattopic = fulltopic.replace("cmnd", "stat")
subtopic=stattopic+"/RESULT"
client["subtopic"]=subtopic
# subscribe topic for IP stats
client["subtopicstat5"]=stattopic+"/STATUS5"


else:
client["subtopic"]=searchdata(HW_INFO_NAME,items,HW_CTRL_TITLE)
# the initial part of the fulltopic is the MQTT topic, then after the "//" double back slash, there are the Json fiels to look for.
# Json fields are separated by "/"
stinglist=fulltopic.split("//")
subtopic=stinglist[0]
client["subtopic"]=subtopic
if len(stinglist)>1:
jsonlist=stinglist[1].split("/")
client["jsonlist"]=jsonlist
# subscribe topic for IP stats
stattopic = subtopic.replace("tele", "stat")
stattopicstatus5=stattopic.replace("SENSOR", "STATUS5")
client["subtopicstat5"]=stattopicstatus5


# subscribe topic for IP stats command
cmdtopicstat5=client["subtopicstat5"].replace("stat", "cmnd")
cmdtopicstat5=cmdtopicstat5.replace("STATUS5", "STATUS")
client["cmdtopicstat5"]=cmdtopicstat5

MQTTcontrol.CLIENTSLIST[items]=client


Expand All @@ -973,6 +1015,15 @@ def initMQTT():

return True

def SendSTATcmdtoall():
MQTTcontrol.MQTT_output_all_stats()
return True

def getSTATfromall():
return MQTTcontrol.MQTT_get_all_stats()



def setallGPIOinputs(): # this function sets all GPIO to input, actually it disable I2C and SPI as this functions are set using the PIN special mode Alt0
for pinstr in HWcontrol.RPIMODBGPIOPINLIST:
HWcontrol.GPIO_setup(pinstr, "in")
Expand Down
28 changes: 24 additions & 4 deletions mqtt/MQTTutils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

import time, logging
import statusdataDBmod
from datetime import datetime,date,timedelta

try:
__import__("paho")
Expand All @@ -11,6 +13,9 @@
import paho.mqtt.client as mqtt


SubscriptionLog={}
SubscriptionLog["default"]={}

logger = logging.getLogger("hydrosys4."+__name__)


Expand All @@ -32,10 +37,21 @@ def Create_connections_and_subscribe(CLIENTSLIST):
clientinfo["clientobj"]=aclient
clientinfo["clientID"]=clientID
aclient.connect()
aclient.subscribe()
aclient.subscribe() # subscribe to default subtopic
aclient.subscribe(clientinfo["subtopicstat5"])
aclient.loop_start()


def Disconnect_clients(CLIENTSLIST):
if MQTTlib:
# expected CLIENTLIST structure: {clientid1:{"broker":"","port":"" ...} ,clientid2:{"broker":"","port":"" ...}, clientid3:{"broker":"","port":"" ...} }
for clientname in CLIENTSLIST:
clientinfo=CLIENTSLIST[clientname]
if "clientobj" in clientinfo:
aclient=clientinfo["clientobj"]
aclient.loop_stop()
aclient.disconnect()




class Client:
Expand Down Expand Up @@ -65,8 +81,12 @@ def on_disconnect(self, mqttc, obj, rc):


def on_message(self,mqttc, obj, msg):
print(self.clientid + " " + "Message: "+ msg.topic + " " + str(msg.qos) + " " + str(msg.payload))

global SubscriptionLog
payload=msg.payload.decode('utf-8')
print(self.clientid + " " + "Message: "+ msg.topic + " " + str(msg.qos) + " " + payload)
statusdataDBmod.write_status_data(SubscriptionLog,msg.topic,"jsonstring",payload)
timestamp=datetime.utcnow()
statusdataDBmod.write_status_data(SubscriptionLog,msg.topic,"timestamp",timestamp)

def on_publish(self,mqttc, obj, mid):
print(self.clientid + " " + "Published: "+ " " +"mid: " + str(mid))
Expand Down
2 changes: 1 addition & 1 deletion selectedplanmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def startpump(target,activationseconds,MinAveragetemp,MaxAverageHumid):
def periodicdatarequest(sensorname):
print("Read sensors request: ", sensorname , " " , datetime.now())
logger.info('Read sensor data: %s - %s', sensorname, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
sensorvalue=hardwaremod.getsensordata(sensorname,3)
isok, sensorvalue, errmsg = hardwaremod.getsensordata(sensorname,3)
if sensorvalue!="":
sensordbmod.insertdataintable(sensorname,sensorvalue)
# Automation algoritm
Expand Down
2 changes: 1 addition & 1 deletion sensordbmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def readallsensorsdatabase():
sensorvalues={}
sensortimestamp={}
for sensorname in sensorlist:
#sensorvalues[sensorname]=getsensordata(sensorname,3)

databasevalues=[]
samplesnumber=1
getsensordbdatasamplesN(sensorname,databasevalues,samplesnumber)
Expand Down
Loading

0 comments on commit 794aaf2

Please sign in to comment.