aqi_monitor/web/src/graph_monthly.py

286 lines
9.7 KiB
Python

""" handle all monthly tasks """
import json
from os import listdir, path
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from src.db import DatabaseConnect
from src.helper import plt_fill
class MonthStatus:
""" check what needs to be done """
ARCHIVE_PATH = 'static/dyn/monthly'
FIRST_MONTH = (2021, 3)
def __init__(self):
self.missing_stamps = self.build_missing_timestamps()
@staticmethod
def get_epoch(now):
""" create relevant timestamps for month passed as datetime """
# last month
m_start = datetime(now.year, now.month, day=1)
if m_start.month < 12:
m_end = datetime(
m_start.year, m_start.month + 1, day=1
) - timedelta(seconds=1)
elif m_start.month == 12:
m_end = datetime(m_start.year + 1, 1, 1) - timedelta(seconds=1)
m_stamp = (int(m_start.strftime('%s')), int(m_end.strftime('%s')))
# last year
y_start = m_start.replace(year=m_start.year - 1)
if y_start.month < 12:
y_end = datetime(
y_start.year, y_start.month + 1, day=1
) - timedelta(seconds=1)
elif y_start.month == 12:
y_end = datetime(y_start.year + 1, 1, 1) - timedelta(seconds=1)
y_stamp = (int(y_start.strftime('%s')), int(y_end.strftime('%s')))
return (m_stamp, y_stamp)
def get_existing(self):
"""get list of all monthly graphs created"""
all_files = [i for i in listdir(self.ARCHIVE_PATH) if '.png' in i]
existing = []
if all_files:
pairs = [tuple(path.splitext(i)[0].split("-")) for i in all_files]
existing = [(int(i[0]), int(i[1])) for i in pairs]
existing.sort()
return existing
def get_expected(self):
"""get a list of expected monthly graphs"""
today = datetime.now()
this_month = datetime(today.year, today.month, 1)
last_stamp = (this_month.year, this_month.month)
expected = []
to_check = self.FIRST_MONTH
while True:
if to_check == last_stamp:
break
expected.append(to_check)
if to_check[1] == 12:
to_check = (to_check[0] + 1, 1)
else:
to_check = (to_check[0], to_check[1] + 1)
return expected
def build_missing_timestamps(self):
"""check which months are missing"""
existing = self.get_existing()
expected = self.get_expected()
missing_stamps = []
for month_tpl in expected:
if month_tpl not in existing:
time_obj = datetime(month_tpl[0], month_tpl[1], 2)
missing_stamp = self.get_epoch(time_obj)
missing_stamps.append(missing_stamp)
return missing_stamps
class MonthGenerator():
""" create the monthly graph and json table """
def __init__(self, timestamps):
self.m_stamp, self.y_stamp = timestamps
self.m_rows, self.y_rows = self.get_data()
self.axis = self.build_axis()
def get_data(self):
""" export from postgres """
m_query = ('SELECT epoch_time, aqi_value FROM aqi WHERE '
f'epoch_time > {self.m_stamp[0]} AND '
f'epoch_time < {self.m_stamp[1]} '
'ORDER BY epoch_time DESC;')
y_query = ('SELECT epoch_time, aqi_value FROM aqi WHERE '
f'epoch_time > {self.y_stamp[0]} AND '
f'epoch_time < {self.y_stamp[1]} '
'ORDER BY epoch_time DESC;')
# make the call
db_handler = DatabaseConnect()
m_rows = db_handler.db_execute(m_query)
y_rows = db_handler.db_execute(y_query)
db_handler.db_close()
return m_rows, y_rows
def build_axis(self):
""" build axis from rows """
# initial df
x_timeline = [datetime.fromtimestamp(i[0]) for i in self.m_rows]
y_aqi_values = [int(i[1]) for i in self.m_rows]
data = {'timestamp': x_timeline, 'now_aqi': y_aqi_values}
df = pd.DataFrame(data)
indexed = df.set_index('timestamp')
indexed.sort_values(by=['timestamp'], inplace=True)
mean = indexed.resample('8h').mean().round()
# reset timestamp to day
mean.reset_index(level=0, inplace=True)
mean['timestamp'] = mean['timestamp'].dt.strftime('%d %H:%M')
mean.set_index('timestamp', inplace=True)
# second df with last year data
x_timeline = [datetime.fromtimestamp(i[0]) for i in self.y_rows]
y_aqi_values = [int(i[1]) for i in self.y_rows]
data = {'timestamp': x_timeline, 'year_aqi': y_aqi_values}
df = pd.DataFrame(data)
indexed = df.set_index('timestamp')
indexed.sort_values(by=['timestamp'], inplace=True)
# skip if empty
if len(indexed) == 0:
y_mean = indexed
else:
y_mean = indexed.resample('8h').mean().round()
# reset timestamp to day
y_mean.reset_index(level=0, inplace=True)
if len(indexed):
y_mean['timestamp'] = y_mean['timestamp'].dt.strftime('%d %H:%M')
y_mean.set_index('timestamp', inplace=True)
# merge the two
mean['year_aqi'] = y_mean['year_aqi']
mean.reset_index(level=0, inplace=True)
mean.sort_values(by='timestamp', ascending=True, inplace=True)
# return axis
axis = {
'x': mean['timestamp'],
'y_1': mean['now_aqi'],
'y_2': mean['year_aqi']
}
return axis
def write_plt(self):
""" write monthly plot """
x = self.axis['x']
y_1 = self.axis['y_1'].replace(0, 1)
y_2 = self.axis['y_2'].replace(0, 1)
# parse timestamp
date_month = datetime.fromtimestamp(self.m_rows[-1][0]).date()
date_title = date_month.strftime('%b %Y')
date_file = date_month.strftime('%Y-%m')
month_short = date_month.strftime('%b')
file_name = 'static/dyn/monthly/' + date_file + '.png'
print(f'exporting graph for {date_title}')
# build ticks
y_max = np.ceil(max(pd.concat([y_1, y_2])) / 50) * 50 + 50
x_range = np.arange(0, len(x), step=9)
last_day = int(x.max().split()[0])
x_numbers = np.arange(1, last_day + 1, step=3)
x_dates = [f'{str(i).zfill(2)} {month_short}' for i in x_numbers]
x_ticks = x_range, x_dates
# plot
plt.style.use('seaborn-v0_8')
plt.plot(x, y_1, color='#313131', label='this year')
plt.plot(
x, y_2, color='#666666', linestyle='dashed', label='last year'
)
# fill colors
plt_fill(plt, x, y_1)
plt.xticks(x_ticks[0], x_ticks[1])
plt.yticks(np.arange(0, y_max, step=50))
plt.title(f'AQI values for: {date_title}', fontsize=20)
plt.legend()
plt.tight_layout()
plt.savefig(file_name, dpi=300)
plt.figure()
@staticmethod
def get_aqi(val):
""" helper function to get aqi category """
breakpoints = [
('Good', 0, 50),
('Moderate', 50, 100),
('Unhealthy for Sensitive Groups', 100, 150),
('Unhealthy', 150, 200),
('Very Unhealthy', 200, 300),
('Hazardous', 300, 500),
]
for break_point in breakpoints:
category, min_val, max_val = break_point
if min_val < val <= max_val:
# found it
break
return category
@staticmethod
def get_change(m_val, y_val):
""" helper function to get change on thresh """
# skip if nan
if y_val == 'nan':
return y_val
diff_avg = (m_val - y_val) / m_val
if diff_avg <= -0.15:
avg_change = 'down'
elif diff_avg >= 0.15:
avg_change = 'up'
else:
avg_change = 'same'
return avg_change
def write_table(self):
""" write json file with monthly details """
date_month = datetime.fromtimestamp(self.m_rows[-1][0]).date()
date_file = date_month.strftime('%Y-%m')
file_name = 'static/dyn/monthly/' + date_file + '.json'
# current
m_min = int(self.axis['y_1'].min())
m_max = int(self.axis['y_1'].max())
m_avg = int(self.axis['y_1'].mean())
m_cat = self.get_aqi(m_avg)
# last
try:
y_min = int(self.axis['y_2'].min())
y_max = int(self.axis['y_2'].max())
y_avg = int(self.axis['y_2'].mean())
y_cat = self.get_aqi(y_avg)
except ValueError:
y_min = 'nan'
y_max = 'nan'
y_avg = 'nan'
y_cat = 'nan'
# build dict
monthly_dict = {
'data': [
['min: ', m_min, y_min, self.get_change(m_min, y_min)],
['max: ', m_max, y_max, self.get_change(m_max, y_max)],
['avg: ', m_avg, y_avg, self.get_change(m_avg, y_avg)],
['avg aqi: ', m_cat, y_cat, self.get_change(m_avg, y_avg)]
]
}
# write to disk
with open(file_name, 'w') as f:
f.write(json.dumps(monthly_dict))
def main():
""" main to export monthly graph an table json """
# check if needed
month_status = MonthStatus()
if not month_status.missing_stamps:
print('all monthly already created, skipping...')
return
# create
print('creating monthly graph and json file')
for month in month_status.missing_stamps:
month_generator = MonthGenerator(month)
month_generator.write_plt()
month_generator.write_table()