mirror of
https://github.com/bbilly1/aqi_monitor.git
synced 2024-08-02 16:03:33 +00:00
85 lines
2.0 KiB
Python
85 lines
2.0 KiB
Python
|
"""entry point to collect data from sensors"""
|
||
|
|
||
|
import json
|
||
|
import sys
|
||
|
|
||
|
from os import path
|
||
|
|
||
|
import requests
|
||
|
|
||
|
from sensor_sds011 import SDS
|
||
|
from sensor_bme280 import BmeSensor
|
||
|
|
||
|
|
||
|
class PiSensor:
|
||
|
"""collect and send data"""
|
||
|
|
||
|
SENSOR_ID = 1
|
||
|
|
||
|
def __init__(self):
|
||
|
self.data = False
|
||
|
|
||
|
def get_data(self):
|
||
|
"""get all data from sensors"""
|
||
|
self.data = {}
|
||
|
self.get_sds011()
|
||
|
self.get_bme()
|
||
|
self.add_static()
|
||
|
|
||
|
def get_sds011(self):
|
||
|
"""get data dict from sds011"""
|
||
|
sds_data = SDS().collect()
|
||
|
self.data.update(sds_data)
|
||
|
|
||
|
def get_bme(self):
|
||
|
"""get data dict from bme"""
|
||
|
bme_data = BmeSensor().collect()
|
||
|
self.data.update(bme_data)
|
||
|
|
||
|
def add_static(self):
|
||
|
"""add static values to data"""
|
||
|
self.data.update(
|
||
|
{
|
||
|
"sensor_id": self.SENSOR_ID,
|
||
|
"uptime": self.get_uptime(),
|
||
|
}
|
||
|
)
|
||
|
|
||
|
@staticmethod
|
||
|
def get_uptime():
|
||
|
"""read uptime"""
|
||
|
with open("/proc/uptime", "r", encoding="utf-8") as f:
|
||
|
uptime_seconds = float(f.readline().split()[0])
|
||
|
|
||
|
return uptime_seconds
|
||
|
|
||
|
def send_data(self):
|
||
|
"""post data to api endpoint"""
|
||
|
config = self.read_config()
|
||
|
auth = (config["username"], config["password"])
|
||
|
response = requests.post(config["url"], json=self.data, auth=auth)
|
||
|
if not response.ok:
|
||
|
print(response.text)
|
||
|
|
||
|
def read_config(self):
|
||
|
"""read config file"""
|
||
|
# build path
|
||
|
root_folder = path.dirname(sys.argv[0])
|
||
|
if root_folder == '/sbin':
|
||
|
# running interactive
|
||
|
config_path = 'config.json'
|
||
|
else:
|
||
|
config_path = path.join(root_folder, 'config.json')
|
||
|
|
||
|
with open(config_path, "r", encoding="utf-8") as f:
|
||
|
config = json.loads(f.read())
|
||
|
|
||
|
return config
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
sensor = PiSensor()
|
||
|
sensor.get_data()
|
||
|
print(sensor.data)
|
||
|
sensor.send_data()
|