aqi_monitor/aqi_monitor_py/sensor/main.py

85 lines
2.0 KiB
Python

"""entry point to collect data from sensors"""
import json
import sys
from os import path
import requests
from sensor_sds011 import SDS
from sensor_bme280 import BmeSensor
class PiSensor:
"""collect and send data"""
SENSOR_ID = 1
def __init__(self):
self.data = False
def get_data(self):
"""get all data from sensors"""
self.data = {}
self.get_sds011()
self.get_bme()
self.add_static()
def get_sds011(self):
"""get data dict from sds011"""
sds_data = SDS().collect()
self.data.update(sds_data)
def get_bme(self):
"""get data dict from bme"""
bme_data = BmeSensor().collect()
self.data.update(bme_data)
def add_static(self):
"""add static values to data"""
self.data.update(
{
"sensor_id": self.SENSOR_ID,
"uptime": self.get_uptime(),
}
)
@staticmethod
def get_uptime():
"""read uptime"""
with open("/proc/uptime", "r", encoding="utf-8") as f:
uptime_seconds = float(f.readline().split()[0])
return uptime_seconds
def send_data(self):
"""post data to api endpoint"""
config = self.read_config()
auth = (config["username"], config["password"])
response = requests.post(config["url"], json=self.data, auth=auth)
if not response.ok:
print(response.text)
def read_config(self):
"""read config file"""
# build path
root_folder = path.dirname(sys.argv[0])
if root_folder == '/sbin':
# running interactive
config_path = 'config.json'
else:
config_path = path.join(root_folder, 'config.json')
with open(config_path, "r", encoding="utf-8") as f:
config = json.loads(f.read())
return config
if __name__ == "__main__":
sensor = PiSensor()
sensor.get_data()
print(sensor.data)
sensor.send_data()