Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@
from modules.common.store._api import LoggingValueStore


def pytest_configure(config):
config.addinivalue_line("markers", "no_mock_full_hour: mark test to disable full_hour mocking.")
config.addinivalue_line("markers", "no_mock_quarter_hour: mark test to disable quarter_hour mocking.")


@pytest.fixture(autouse=True)
def mock_open_file(monkeypatch) -> None:
mock_config = Mock(return_value={"dc_charging": False, "openwb-version": 1, "max_c_socket": 32})
monkeypatch.setattr(hardware_configuration, "_read_configuration", mock_config)


@pytest.fixture(autouse=True)
def mock_today(monkeypatch) -> None:
def mock_today(monkeypatch, request) -> None:
datetime_mock = MagicMock(wraps=datetime.datetime)
# Montag 16.05.2022, 8:40:52 "05/16/2022, 08:40:52" Unix: 1652683252
datetime_mock.today.return_value = datetime.datetime(2022, 5, 16, 8, 40, 52)
monkeypatch.setattr(datetime, "datetime", datetime_mock)
mock_today_timestamp = Mock(return_value=1652683252)
monkeypatch.setattr(timecheck, "create_timestamp", mock_today_timestamp)
now_timestamp = Mock(return_value=1652683252)
monkeypatch.setattr(timecheck, "create_timestamp", now_timestamp)
full_hour_timestamp = Mock(return_value=int(datetime.datetime(2022, 5, 16, 8, 0, 0).timestamp()))
monkeypatch.setattr(timecheck, "create_unix_timestamp_current_full_hour", full_hour_timestamp)


@pytest.fixture(autouse=True)
Expand Down
68 changes: 53 additions & 15 deletions packages/control/optional.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from helpermodules import hardware_configuration
from helpermodules.constants import NO_ERROR
from helpermodules.pub import Pub
from helpermodules.timecheck import create_unix_timestamp_current_full_hour
from helpermodules import timecheck
from helpermodules.utils import thread_handler
from modules.common.configurable_tariff import ConfigurableElectricityTariff
from modules.common.configurable_monitoring import ConfigurableMonitoring
Expand Down Expand Up @@ -64,37 +64,75 @@ def et_charging_allowed(self, max_price: float):
log.exception("Fehler im Optional-Modul")
return False

def et_get_current_price(self):
def __get_first_entry(self, prices: dict[str, float]) -> tuple[str, float]:
if self.et_provider_available():
return self.data.et.get.prices[str(int(create_unix_timestamp_current_full_hour()))]
prices = self.data.et.get.prices
timestamp, first = next(iter(prices.items()))
price_timeslot_seconds = self.__calculate_price_timeslot_length(prices)
now = int(timecheck.create_timestamp())
prices = {
price[0]: price[1]
for price in prices.items()
if int(price[0]) > now - (price_timeslot_seconds - 1)
}
self.data.et.get.prices = prices
timestamp, first = next(iter(prices.items()))
return timestamp, first
else:
raise Exception("Kein Anbieter für strompreisbasiertes Laden konfiguriert.")

def __get_current_timeslot_start(self, prices: dict[str, float]) -> float:
timestamp, first = self.__get_first_entry(prices)
return timestamp

def et_get_current_price(self, prices: dict[str, float]) -> float:
timestamp, first = self.__get_first_entry(prices)
return first

def __calculate_price_timeslot_length(self, prices: dict) -> int:
first_timestamps = list(prices.keys())[:2]
return int(first_timestamps[1]) - int(first_timestamps[0])

def et_get_loading_hours(self, duration: float, remaining_time: float) -> List[int]:
"""
Parameter
---------
duration: float
benötigte Ladezeit

remaining_time: float
Restzeit bis Termin (von wo an gerechnet???)
Return
------
list: Key des Dictionary (Unix-Sekunden der günstigen Stunden)
list: Key des Dictionary (Unix-Sekunden der günstigen Zeit-Slots)
"""
if self.et_provider_available() is False:
raise Exception("Kein Anbieter für strompreisbasiertes Laden konfiguriert.")
try:
prices = self.data.et.get.prices
prices_in_scheduled_time = {}
i = 0
for timestamp, price in prices.items():
if i < ceil((duration+remaining_time)/3600):
prices_in_scheduled_time.update({timestamp: price})
i += 1
else:
break
ordered = sorted(prices_in_scheduled_time.items(), key=lambda x: x[1])
return [int(i[0]) for i in ordered][:ceil(duration/3600)]
price_timeslot_seconds = self.__calculate_price_timeslot_length(prices)
first_timeslot_start = self.__get_current_timeslot_start(prices)
price_candidates = {
timestamp: price
for timestamp, price in prices.items()
if (
# is current timeslot or futur
int(timestamp) >= int(first_timeslot_start) and
# ends before plan target time
int(timestamp) + price_timeslot_seconds <= int(first_timeslot_start) + remaining_time
)
}
now = int(timecheck.create_timestamp())
ordered_by_date_reverse = reversed(sorted(price_candidates.items(), key=lambda x: x[0]))
ordered_by_price = sorted(ordered_by_date_reverse, key=lambda x: x[1])
selected_time_slots = {int(i[0]): float(i[1])
for i in ordered_by_price[:1 + ceil(duration/price_timeslot_seconds)]}
selected_lenght = price_timeslot_seconds * (
len(selected_time_slots)-1) - (int(now) - min(selected_time_slots))
return sorted(selected_time_slots.keys()
if not (min(selected_time_slots) > now or duration <= selected_lenght)
else [timestamp[0] for timestamp in iter(selected_time_slots.items())][:-1]
)
# if sum() sorted([int(i[0]) for i in ordered_by_price][:ceil(duration/price_timeslot_seconds)])
except Exception:
log.exception("Fehler im Optional-Modul")
return []
Expand Down
211 changes: 192 additions & 19 deletions packages/control/optional_test.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,204 @@
from unittest.mock import Mock
from control.optional import Optional
from helpermodules import timecheck
import pytest

ONE_HOUR_SECONDS = 3600
IGNORED = 0.0001
CHEEP = 0.0002
EXPENSIVE = 0.0003

def test_et_get_loading_hours(monkeypatch):

@pytest.mark.no_mock_full_hour
@pytest.mark.parametrize(
"granularity, now_ts, duration, remaining_time, price_list, expected_loading_hours",
[
pytest.param(
"full_hour",
1698228000,
ONE_HOUR_SECONDS,
3 * ONE_HOUR_SECONDS,
{
"1698224400": 0.00012499,
"1698228000": 0.00011737999999999999, # matching now
"1698231600": 0.00011562000000000001,
"1698235200": 0.00012447, # last before plan target
"1698238800": 0.00013813,
"1698242400": 0.00014751,
"1698246000": 0.00015372999999999998,
"1698249600": 0.00015462,
"1698253200": 0.00015771,
"1698256800": 0.00013708,
"1698260400": 0.00012355,
"1698264000": 0.00012006,
"1698267600": 0.00011279999999999999,
},
[1698231600],
id="select single time slot of one hour length"
),
pytest.param(
"quarter_hour",
1698226200,
2 * ONE_HOUR_SECONDS,
4 * ONE_HOUR_SECONDS,
{
# first hour
"1698224400": IGNORED,
"1698225300": IGNORED,
"1698226200": EXPENSIVE, # current quarert hour
"1698227100": CHEEP,
# second hour
"1698228000": EXPENSIVE,
"1698228900": EXPENSIVE,
"1698229800": CHEEP,
"1698230700": EXPENSIVE,
# third hour
"1698231600": CHEEP,
"1698232500": CHEEP,
"1698233400": CHEEP,
"1698234300": EXPENSIVE,
# fourth hour
"1698235200": CHEEP,
"1698236100": EXPENSIVE,
"1698237000": EXPENSIVE,
"1698237900": EXPENSIVE,
# fifth hour
"1698238800": CHEEP,
"1698239700": CHEEP, # last before plan target
"1698240600": IGNORED,
"1698241500": IGNORED,
},
[1698227100, 1698229800, 1698231600, 1698232500, 1698233400, 1698235200, 1698238800, 1698239700],
id="select 8 time slots of 15 minutes lenght, include last before plan target"
),
pytest.param(
"quarter_hour",
1698227100,
2 * ONE_HOUR_SECONDS,
4 * ONE_HOUR_SECONDS,
{
# first hour
"1698224400": IGNORED,
"1698225300": IGNORED,
"1698226200": EXPENSIVE,
"1698227100": CHEEP, # current quarert hour
# second hour
"1698228000": EXPENSIVE,
"1698228900": EXPENSIVE,
"1698229800": CHEEP,
"1698230700": EXPENSIVE,
# third hour
"1698231600": CHEEP,
"1698232500": CHEEP,
"1698233400": CHEEP,
"1698234300": EXPENSIVE,
# fourth hour
"1698235200": CHEEP,
"1698236100": EXPENSIVE,
"1698237000": EXPENSIVE,
"1698237900": EXPENSIVE,
# fifth hour
"1698238800": CHEEP,
"1698239700": CHEEP,
"1698240600": EXPENSIVE, # last before plan target
"1698241500": IGNORED,
},
[1698227100, 1698229800, 1698231600, 1698232500, 1698233400, 1698235200, 1698238800, 1698239700],
id="select 8 time slots of 15 minutes lenght, include current quarter hour"
),
pytest.param(
"quarter_hour",
1698227900,
2 * ONE_HOUR_SECONDS,
4 * ONE_HOUR_SECONDS,
{
# first hour
"1698224400": IGNORED,
"1698225300": IGNORED,
"1698226200": EXPENSIVE,
"1698227100": CHEEP, # current quarert hour
# second hour
"1698228000": EXPENSIVE,
"1698228900": EXPENSIVE,
"1698229800": CHEEP,
"1698230700": EXPENSIVE,
# third hour
"1698231600": CHEEP,
"1698232500": CHEEP,
"1698233400": CHEEP,
"1698234300": EXPENSIVE,
# fourth hour
"1698235200": CHEEP,
"1698236100": EXPENSIVE,
"1698237000": EXPENSIVE,
"1698237900": EXPENSIVE,
# fifth hour
"1698238800": CHEEP,
"1698239700": CHEEP,
"1698240600": EXPENSIVE, # last before plan target
"1698241500": IGNORED,
},
[1698227100, 1698229800, 1698231600, 1698232500,
1698233400, 1698235200, 1698238800, 1698239700, 1698240600],
id="select additional if time elapsed in current slot makes selection too short"
),
pytest.param(
"quarter_hour",
1698226600,
2 * ONE_HOUR_SECONDS,
4 * ONE_HOUR_SECONDS,
{
# first hour
"1698224400": IGNORED,
"1698225300": IGNORED,
"1698226200": EXPENSIVE,
"1698227100": CHEEP, # current quarert hour
# second hour
"1698228000": EXPENSIVE,
"1698228900": EXPENSIVE,
"1698229800": CHEEP,
"1698230700": EXPENSIVE,
# third hour
"1698231600": CHEEP,
"1698232500": CHEEP,
"1698233400": CHEEP,
"1698234300": EXPENSIVE,
# fourth hour
"1698235200": CHEEP,
"1698236100": EXPENSIVE,
"1698237000": EXPENSIVE,
"1698237900": EXPENSIVE,
# fifth hour
"1698238800": EXPENSIVE,
"1698239700": EXPENSIVE,
"1698240600": EXPENSIVE, # last before plan target
"1698241500": IGNORED,
},
[1698227100, 1698229800, 1698231600, 1698232500, 1698233400, 1698235200, 1698238800, 1698239700],
id="select latest if most expensive candidates have same price"
),
],
)
def test_et_get_loading_hours(granularity,
now_ts,
duration,
remaining_time,
price_list,
expected_loading_hours,
monkeypatch):
# setup
opt = Optional()
opt.data.et.get.prices = PRICE_LIST
opt.data.et.get.prices = price_list
mock_et_provider_available = Mock(return_value=True)
monkeypatch.setattr(opt, "et_provider_available", mock_et_provider_available)
monkeypatch.setattr(
timecheck,
"create_timestamp",
Mock(return_value=now_ts)
)

# execution
loading_hours = opt.et_get_loading_hours(3600, 7200)
loading_hours = opt.et_get_loading_hours(duration=duration, remaining_time=remaining_time)

# evaluation
assert loading_hours == [1698231600]


PRICE_LIST = {"1698224400": 0.00012499,
"1698228000": 0.00011737999999999999,
"1698231600": 0.00011562000000000001,
"1698235200": 0.00012447,
"1698238800": 0.00013813,
"1698242400": 0.00014751,
"1698246000": 0.00015372999999999998,
"1698249600": 0.00015462,
"1698253200": 0.00015771,
"1698256800": 0.00013708,
"1698260400": 0.00012355,
"1698264000": 0.00012006,
"1698267600": 0.00011279999999999999}
assert loading_hours == expected_loading_hours
12 changes: 8 additions & 4 deletions packages/helpermodules/timecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,14 @@ def duration_sum(first: str, second: str) -> str:
return "00:00"


def __get_timedelta_obj(time: str) -> datetime.timedelta:
def __get_timedelta_obj(time_str: str) -> datetime.timedelta:
""" erstellt aus einem String ein timedelta-Objekt.
Parameter
---------
time: str
time_str: str
Zeitstrings HH:MM ggf DD:HH:MM
"""
time_charged = time.split(":")
time_charged = time_str.split(":")
if len(time_charged) == 2:
delta = datetime.timedelta(hours=int(time_charged[0]),
minutes=int(time_charged[1]))
Expand All @@ -316,7 +316,7 @@ def __get_timedelta_obj(time: str) -> datetime.timedelta:
hours=int(time_charged[1]),
minutes=int(time_charged[2]))
else:
raise Exception("Unknown charge duration: "+time)
raise Exception(f"Unknown charge duration: {time_str}")
return delta


Expand All @@ -336,3 +336,7 @@ def convert_timestamp_delta_to_time_string(timestamp: int, delta: int) -> str:
return f"{minute_diff} Min."
elif seconds_diff > 0:
return f"{seconds_diff} Sek."


def convert_to_timestamp(timestring: str) -> int:
return int(datetime.datetime.fromisoformat(timestring).timestamp())
Loading