Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
23e0cd3
Add AST nodes for the new tree walking Formula implementation
shsms Nov 20, 2025
a67e0b3
Introduce a `Peekable` wrapper around `Iterator`
shsms Nov 20, 2025
25dfc0b
Implement a lexer for the new component graph formulas
shsms Nov 20, 2025
9366cfc
Implement a `ResampledStreamFetcher`
shsms Nov 20, 2025
c32c852
Implement a `FormulaEvaluatingActor`
shsms Nov 20, 2025
7e2ad89
Implement the `Formula` type
shsms Nov 20, 2025
56a4377
Implement a parser for string formulas
shsms Nov 20, 2025
194b742
Add tests for formulas
shsms Nov 20, 2025
0454451
Add a 3-phase formula type that wraps 3 1-phase formulas
shsms Nov 20, 2025
e23af0a
Add a formula pool for storing and reusing formulas
shsms Nov 20, 2025
465f914
Add `frequenz-microgrid-component-graph` as a dependency
shsms Nov 20, 2025
7b0fedf
Remove test for island-mode
shsms Nov 20, 2025
b37b5f5
Switch to use the external component graph
shsms Nov 20, 2025
edd8617
Delete the old component graph
shsms Nov 28, 2025
678c78a
Replace FormulaEngine with the new Formula
shsms Nov 20, 2025
a69ba9e
Remove tests for the old fallback mechanism
shsms Nov 20, 2025
0a843ea
Send test data from secondary components
shsms Nov 20, 2025
d70a647
Test priority of component powers in formulas over meter powers
shsms Nov 20, 2025
898c976
Increase number of active namespaces for formula test
shsms Nov 28, 2025
c9719e5
Drop old formula engine
shsms Nov 20, 2025
b183251
Document the new Formula implementation
shsms Nov 20, 2025
2470c10
Remove all remaining references to FormulaEngines
shsms Nov 20, 2025
5f644c8
Update release notes
shsms Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If FormulaEngine and FormulaEngine3Phase were part of the public interface, and you say the interface didn't really changed (so the class were only renamed), can we keep a deprecated alias with the old names so we can make this a non-breaking change?

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Formulas on telemetry streams."""
205 changes: 205 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_ast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Formula AST nodes and evaluation logic."""

from __future__ import annotations

import abc
import logging
import math
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Generic

from typing_extensions import override

from ..._internal._math import is_close_to_zero
from .._base_types import QuantityT, Sample
from ._functions import Function

_logger = logging.getLogger(__name__)


@dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest making all these dataclasses kw_only for extra clarity. If not all, at least the ones taking 2 arguments of the same type, like Add (left, right).

class Node(abc.ABC):
"""An abstract syntax tree node representing a formula expression."""

span: tuple[int, int] | None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring. What is span?


@abc.abstractmethod
def evaluate(self) -> float | None:
"""Evaluate the expression and return its numerical value."""

@abc.abstractmethod
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the node."""

@override
def __str__(self) -> str:
"""Return the string representation of the node."""
return self.format()
Comment on lines +34 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea for the future: add __format__() with some format modifier so we can do something like f{node:w} to call format(wrap=True).



@dataclass
class TelemetryStream(Node, Generic[QuantityT]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use float here instead to make it simpler? Or do you need quantities because these streams will be somehow operated on and you already need, at this level, to know the quantity to disallow invalid operations between different quantities?

"""A AST node that retrieves values from a component's telemetry stream."""

source: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The properties are missing docs, I think mkdocs will not show them if they are undocumented (also it is not clear to me what source is).

stream: AsyncIterator[Sample[QuantityT]]
_latest_sample: Sample[QuantityT] | None = None

@property
def latest_sample(self) -> Sample[QuantityT] | None:
"""Return the latest fetched sample for this component."""
return self._latest_sample

@override
def evaluate(self) -> float | None:
"""Return the base value of the latest sample for this component."""
if self._latest_sample is None:
raise ValueError("Next value has not been fetched yet.")
if self._latest_sample.value is None:
return None
return self._latest_sample.value.base_value

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the telemetry stream node."""
return f"{self.source}"

async def fetch_next(self) -> None:
"""Fetch the next value for this component and store it internally."""
self._latest_sample = await anext(self.stream)


@dataclass
class FunCall(Node):
"""A function call in the formula."""

function: Function
args: list[Node]
Comment on lines +80 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about properties docs for the rest of the file.


@override
def evaluate(self) -> float | None:
"""Evaluate the function call with its arguments."""
return self.function(arg.evaluate() for arg in self.args)

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the function call node."""
args_str = ", ".join(str(arg) for arg in self.args)
return f"{self.function.name}({args_str})"


@dataclass
class Constant(Node):
"""A constant numerical value in the formula."""

value: float

@override
def evaluate(self) -> float | None:
"""Return the constant value."""
return self.value

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the constant node."""
return str(self.value)


@dataclass
class Add(Node):
"""Addition operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the addition of the left and right nodes."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left + right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the addition node."""
expr = f"{self.left} + {self.right}"
if wrap:
expr = f"({expr})"
return expr


@dataclass
class Sub(Node):
"""Subtraction operation node."""

left: Node
right: Node

Comment on lines +137 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why Add, Sub, Mul are a Node , not a Function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Function is the actual function, it would be like value in Constant, while FunCall is the calling of the function and the node in the AST.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because functions are one more layer removed, there's a FunCall node, then the Function. Nice to group actual function calls like that, but don't have to do that for the fundamental operations.

@override
def evaluate(self) -> float | None:
"""Evaluate the subtraction of the right node from the left node."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left - right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the subtraction node."""
expr = f"{self.left} - {self.right.format(True)}"
if wrap:
expr = f"({expr})"
return expr


@dataclass
class Mul(Node):
"""Multiplication operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the multiplication of the left and right nodes."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left * right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the multiplication node."""
return f"{self.left.format(True)} * {self.right.format(True)}"


@dataclass
class Div(Node):
"""Division operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the division of the left node by the right node."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
if is_close_to_zero(right):
return math.nan
return left / right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the division node."""
return f"{self.left.format(True)} / {self.right.format(True)}"
129 changes: 129 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_formula_evaluator.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally wonder if adding a timeout wouldn't be a good idea while fetching, as an extra safety net, but I guess if we don't receive it means the resampler is not working and everything will break anyway, right?

Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""An evaluator for a formula represented as an AST."""


import asyncio
import logging
from collections.abc import Callable
from datetime import datetime
from typing import Generic

from frequenz.channels import Broadcast, ReceiverStoppedError, Sender
from typing_extensions import override

from ...actor import Actor
from .._base_types import QuantityT, Sample
from . import _ast
from ._resampled_stream_fetcher import ResampledStreamFetcher

_logger = logging.getLogger(__name__)


class FormulaEvaluatingActor(Generic[QuantityT], Actor):
"""An evaluator for a formula represented as an AST."""

def __init__( # pylint: disable=too-many-arguments
self,
*,
root: _ast.Node,
components: list[_ast.TelemetryStream[QuantityT]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a bit confused about this name, maybe streams?

create_method: Callable[[float], QuantityT],
output_channel: Broadcast[Sample[QuantityT]],
metric_fetcher: ResampledStreamFetcher | None = None,
) -> None:
"""Create a `FormulaEvaluatingActor` instance.

Args:
root: The root node of the formula AST.
components: The telemetry streams that the formula depends on.
create_method: A method to generate the output values with. If the
formula is for generating power values, this would be
`Power.from_watts`, for example.
output_channel: The channel to send evaluated samples to.
metric_fetcher: An optional metric fetcher that needs to be started
before the formula can be evaluated.
"""
super().__init__()

self._root: _ast.Node = root
self._components: list[_ast.TelemetryStream[QuantityT]] = components
self._create_method: Callable[[float], QuantityT] = create_method
self._metric_fetcher: ResampledStreamFetcher | None = metric_fetcher
self._output_channel: Broadcast[Sample[QuantityT]] = output_channel

self._output_sender: Sender[Sample[QuantityT]] = output_channel.new_sender()

@override
async def _run(self) -> None:
"""Run the formula evaluator actor."""
if self._metric_fetcher is not None:
await self._metric_fetcher.subscribe()
await synchronize_receivers(self._components)

while True:
try:
timestamp = next(
comp.latest_sample.timestamp
for comp in self._components
if comp.latest_sample is not None
)

res = self._root.evaluate()
next_sample = Sample(
timestamp, None if res is None else self._create_method(res)
)
await self._output_sender.send(next_sample)
except (StopAsyncIteration, StopIteration):
_logger.debug(
"No more input samples available; stopping formula evaluator. (%s)",
self._root,
)
await self._output_channel.close()
return
except Exception as e: # pylint: disable=broad-except
_logger.error(
"Error evaluating formula %s: %s", self._root, e, exc_info=True
)
await self._output_channel.close()
return

fetch_results = await asyncio.gather(
*(comp.fetch_next() for comp in self._components),
return_exceptions=True,
)
if ex := next((e for e in fetch_results if isinstance(e, Exception)), None):
if isinstance(ex, (StopAsyncIteration, ReceiverStoppedError)):
_logger.debug(
"input streams closed; stopping formula evaluator. (%s)",
self._root,
)
await self._output_channel.close()
return
raise ex
Comment on lines +92 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can potentially simplify this exception handling if you use a TaskGroup, something like:

Suggested change
fetch_results = await asyncio.gather(
*(comp.fetch_next() for comp in self._components),
return_exceptions=True,
)
if ex := next((e for e in fetch_results if isinstance(e, Exception)), None):
if isinstance(ex, (StopAsyncIteration, ReceiverStoppedError)):
_logger.debug(
"input streams closed; stopping formula evaluator. (%s)",
self._root,
)
await self._output_channel.close()
return
raise ex
try:
async with TaskGroup() as tg:
for comp in self._components:
tg.create_task(comp.fetch_next())
except* (StopAsyncIteration, ReceiverStoppedError) as exc:
_logger.debug(
"input streams closed; stopping formula evaluator. (%s): %s",
self._root,
exc,
)
await self._output_channel.close()
# no return here, otherwise it will interrupt the handling of other exceptions
if self._output_channel.is_closed:
return

This should create one task per component to fetch and wait until all finished if there are no errors.

If there are errors, all other pending tasks will be cancelled immediately (gather will wait until all awaitables finish then return_exceptions=True).

If all errors are stop errors, it will log and exit. If there are any other errors, the exception group will bubble up (be re-raised).



async def synchronize_receivers(
components: list[_ast.TelemetryStream[QuantityT]],
) -> None:
"""Synchronize the given telemetry stream receivers."""
_ = await asyncio.gather(
*(comp.fetch_next() for comp in components),
)
Comment on lines +111 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here since you don't need to catch exceptions, gather should work well, but maybe using a TaskGroup for consistency could be a good idea.

Thinking more about it, shouldn't the output channel be closed too if this happens? Maybe you need to do the exception handling in the whole _run() to cover for this.

latest_ts: datetime | None = None
for comp in components:
if comp.latest_sample is not None and (
latest_ts is None or comp.latest_sample.timestamp > latest_ts
):
latest_ts = comp.latest_sample.timestamp
if latest_ts is None:
_logger.debug("No samples available to synchronize receivers.")
return
for comp in components:
if comp.latest_sample is None:
raise RuntimeError("Can't synchronize receivers.")
ctr = 0
while ctr < 10 and comp.latest_sample.timestamp < latest_ts:
await comp.fetch_next()
ctr += 1
Loading