Skip to content

Commit

Permalink
Merge pull request #1 from lucianavlop/price-service
Browse files Browse the repository at this point in the history
Price service
  • Loading branch information
daithihearn authored Apr 16, 2023
2 parents 5448a85 + e9317a0 commit 1595ba7
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 119 deletions.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.DS_Store
.env
.env*
__pycache__
actions-runner
logs
126 changes: 12 additions & 114 deletions Electricity.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,24 @@
import os
import streamlit as st
import requests
import pandas as pd
import json
from babel.numbers import format_decimal
from datetime import date, datetime, timedelta
from dateutil.parser import parse

from dotenv import load_dotenv

load_dotenv()

PRICES_API = os.getenv('PRICES_API')

if PRICES_API is None or PRICES_API == '':
raise Exception('Please provide the PRICES_API environment variable')
from datetime import date, timedelta
from services.PriceService import *

st.set_page_config(layout="wide")


def format_euro(amount) -> str:
return f'{format_decimal(amount, locale="en_GB", format="#,##0.000")}'


class Price:
def __init__(self, price: float, dt: date):
self.value = price
self.value_euro = format_euro(price)
self.datetime = dt
self.hour = dt.strftime('%H')
self.formatted = f'{self.value_euro}@{self.hour}:00'


def get_prices(start: date, end: date) -> list[Price]:
start_day = start.strftime('%Y-%m-%d')
end_day = end.strftime('%Y-%m-%d')
response = requests.get(
f'{PRICES_API}?start={start_day}&end={end_day}')
content = response.content.decode('utf-8')
price_data = json.loads(content)

return [Price(x.get('price'), parse(x.get('dateTime'))) for x in price_data]


def get_current_price(prices: list[Price]) -> Price:
now = datetime.now()
hour = now.strftime('%H')
return next((x for x in prices if x.datetime.strftime('%H') == hour), None)


def get_tomorrow():
today = date.today()
tomorrow = today + timedelta(days=1)
return get_prices(tomorrow, tomorrow)


def get_today():
today = date.today()
return get_prices(today, today)


def get_date_days_ago(days: int):
ago = date.today() - timedelta(days=days)
return get_prices(ago, ago)


def get_min_price(prices: list[Price]):
return min(prices, key=lambda x: x.value)


def get_max_price(prices: list[Price]):
return max(prices, key=lambda x: x.value)


def get_cheapest_period(prices: list[Price], n: int):

prices_sorted = sorted(prices, key=lambda x: x.hour)
min_sum = float('inf')
min_window = None

for i in range(len(prices_sorted) - n + 1):
window_sum = sum(x.value for x in prices_sorted[i:i+n])
if window_sum < min_sum:
min_sum = window_sum
min_window = prices_sorted[i:i+n]

return min_window


def get_cheap_period_recent_average(days: int) -> float:
today = date.today()

total = float(0)
for i in range(days):
day = today - timedelta(days=i)
prices = get_prices(day, day)
if prices:
cheapest_period = get_cheapest_period(prices, 3)
if cheapest_period:
total += calculate_average(cheapest_period)

return total / days


def calculate_average(prices: list[Price]) -> float:
return sum(x.value for x in prices) / len(prices)


# get data
# price_data_v1=get_prices()
st.title('Electricity prices')
st.title('Precios de la electricidad')

start_date_average = date.today() - timedelta(days=30)
elecAverage = round(calculate_average(
get_prices(start_date_average, date.today())), 2)
currentPrice = round(get_current_price(get_today()).value, 2)
difElec = round(currentPrice - elecAverage, 2)
col1, col2, col3, col4 = st.columns(4)
col1.metric("Average Price (30 days)", elecAverage, delta_color="inverse")
col2.metric("Current Price", currentPrice, difElec, delta_color="inverse")
labelminhour = "Min Price at " + get_min_price(get_today()).hour + ":00"
labelmaxhour = "Max Price at " + get_max_price(get_today()).hour + ":00"
col1.metric("Precio Promedio (30 días)", elecAverage, delta_color="inverse")
col2.metric("Precio actual", currentPrice, difElec, delta_color="inverse")
labelminhour = "Precio min a las " + get_min_price(get_today()).hour + ":00"
labelmaxhour = "Precio max a las " + get_max_price(get_today()).hour + ":00"

minToday = round(get_min_price(get_today()).value, 2)
difElecMin = round(minToday - elecAverage, 2)
Expand All @@ -134,9 +32,9 @@ def calculate_average(prices: list[Price]) -> float:

if get_tomorrow():
st.write('TOMORROW')
labelminhourT = "Min Price at " + \
labelminhourT = "Precio min a las " + \
get_min_price(get_tomorrow()).hour + ":00"
labelmaxhourT = "Max Price at " + \
labelmaxhourT = "Precio max a las " + \
get_max_price(get_tomorrow()).hour + ":00"

col1, col2, col3 = st.columns(3)
Expand All @@ -145,10 +43,10 @@ def calculate_average(prices: list[Price]) -> float:
col3.metric(labelmaxhourT, round(get_max_price(
get_tomorrow()).value, 2), delta_color="inverse")
else:
st.subheader("Tomorrow's prices are not available yet")
st.subheader("Los precios de mañana aún no están disponibles")


st.header("Last 30 days evolutions")
st.header("Últimos 30 días")
# Plot timeline
ago_days = 30
i = 0
Expand All @@ -167,7 +65,7 @@ def calculate_average(prices: list[Price]) -> float:

df = pd.DataFrame({
'date': date_arr,
'Average electricity prices': avg_arr
'Precios medios de electricidad': avg_arr
})

df = df.rename(columns={'date': 'index'}).set_index('index')
Expand Down
3 changes: 0 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
babel
streamlit
python-dotenv
asyncio
meross_iot==0.4.5.9
python-i18n[YAML]
python-dateutil
102 changes: 102 additions & 0 deletions services/PriceService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os
import requests
import json
from babel.numbers import format_decimal
from datetime import date, datetime, timedelta
from dateutil.parser import parse
from dotenv import load_dotenv

load_dotenv()

PRICES_API = os.getenv('PRICES_API')

if PRICES_API is None or PRICES_API == '':
raise Exception('Please provide the PRICES_API environment variable')


class Price:
def __init__(self, price: float, dt: date):
self.value = price
self.value_euro = format_euro(price)
self.datetime = dt
self.hour = dt.strftime('%H')
self.formatted = f'{self.value_euro}@{self.hour}:00'


def get_prices(start: date, end: date) -> list[Price]:
start_day = start.strftime('%Y-%m-%d')
end_day = end.strftime('%Y-%m-%d')
response = requests.get(
f'{PRICES_API}?start={start_day}&end={end_day}')
content = response.content.decode('utf-8')
price_data = json.loads(content)

return [Price(x.get('price'), parse(x.get('dateTime'))) for x in price_data]


def get_current_price(prices: list[Price]) -> Price:
now = datetime.now()
hour = now.strftime('%H')
return next((x for x in prices if x.datetime.strftime('%H') == hour), None)


def get_tomorrow():
today = date.today()
tomorrow = today + timedelta(days=1)
return get_prices(tomorrow, tomorrow)


def get_today():
today = date.today()
return get_prices(today, today)


def get_date_days_ago(days: int):
ago = date.today() - timedelta(days=days)
return get_prices(ago, ago)


def get_min_price(prices: list[Price]):
return min(prices, key=lambda x: x.value)


def get_max_price(prices: list[Price]):
return max(prices, key=lambda x: x.value)


def get_cheapest_period(prices: list[Price], n: int):

prices_sorted = sorted(prices, key=lambda x: x.hour)
min_sum = float('inf')
min_window = None

for i in range(len(prices_sorted) - n + 1):
window_sum = sum(x.value for x in prices_sorted[i:i+n])
if window_sum < min_sum:
min_sum = window_sum
min_window = prices_sorted[i:i+n]

return min_window


def get_cheap_period_recent_average(days: int) -> float:
today = date.today()

total = float(0)
for i in range(days):
day = today - timedelta(days=i)
prices = get_prices(day, day)
if prices:
cheapest_period = get_cheapest_period(prices, 3)
if cheapest_period:
total += calculate_average(cheapest_period)

return total / days


def calculate_average(prices: list[Price]) -> float:
return sum(x.value for x in prices) / len(prices)


def format_euro(amount) -> str:
return f'{format_decimal(amount, locale="en_GB", format="#,##0.000")}'

0 comments on commit 1595ba7

Please sign in to comment.