Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
23e0cd3
Add AST nodes for the new tree walking Formula implementation
shsms Nov 20, 2025
a67e0b3
Introduce a `Peekable` wrapper around `Iterator`
shsms Nov 20, 2025
25dfc0b
Implement a lexer for the new component graph formulas
shsms Nov 20, 2025
9366cfc
Implement a `ResampledStreamFetcher`
shsms Nov 20, 2025
c32c852
Implement a `FormulaEvaluatingActor`
shsms Nov 20, 2025
7e2ad89
Implement the `Formula` type
shsms Nov 20, 2025
56a4377
Implement a parser for string formulas
shsms Nov 20, 2025
194b742
Add tests for formulas
shsms Nov 20, 2025
0454451
Add a 3-phase formula type that wraps 3 1-phase formulas
shsms Nov 20, 2025
e23af0a
Add a formula pool for storing and reusing formulas
shsms Nov 20, 2025
465f914
Add `frequenz-microgrid-component-graph` as a dependency
shsms Nov 20, 2025
7b0fedf
Remove test for island-mode
shsms Nov 20, 2025
b37b5f5
Switch to use the external component graph
shsms Nov 20, 2025
edd8617
Delete the old component graph
shsms Nov 28, 2025
678c78a
Replace FormulaEngine with the new Formula
shsms Nov 20, 2025
a69ba9e
Remove tests for the old fallback mechanism
shsms Nov 20, 2025
0a843ea
Send test data from secondary components
shsms Nov 20, 2025
d70a647
Test priority of component powers in formulas over meter powers
shsms Nov 20, 2025
898c976
Increase number of active namespaces for formula test
shsms Nov 28, 2025
c9719e5
Drop old formula engine
shsms Nov 20, 2025
b183251
Document the new Formula implementation
shsms Nov 20, 2025
2470c10
Remove all remaining references to FormulaEngines
shsms Nov 20, 2025
5f644c8
Update release notes
shsms Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If FormulaEngine and FormulaEngine3Phase were part of the public interface, and you say the interface didn't really changed (so the class were only renamed), can we keep a deprecated alias with the old names so we can make this a non-breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas how?

With newer python, we could do type FormulaEngine[W] = Formula[W].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I can just rename the thing back to FormulaEngine. Just thought it was weird because technically we can call everything an engine, resampling engine, etc.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Formulas on telemetry streams."""
205 changes: 205 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_ast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Formula AST nodes and evaluation logic."""

from __future__ import annotations

import abc
import logging
import math
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Generic

from typing_extensions import override

from ..._internal._math import is_close_to_zero
from .._base_types import QuantityT, Sample
from ._functions import Function

_logger = logging.getLogger(__name__)


@dataclass
class Node(abc.ABC):
"""An abstract syntax tree node representing a formula expression."""

span: tuple[int, int] | None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring. What is span?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do docs in next PR, because some of these params have moved a bit. span is only partially implemented. It is the position of the node in the source string formula.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will link here when done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here: d86187f


@abc.abstractmethod
def evaluate(self) -> float | None:
"""Evaluate the expression and return its numerical value."""

@abc.abstractmethod
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the node."""

@override
def __str__(self) -> str:
"""Return the string representation of the node."""
return self.format()


@dataclass
class TelemetryStream(Node, Generic[QuantityT]):
"""A AST node that retrieves values from a component's telemetry stream."""

source: str
stream: AsyncIterator[Sample[QuantityT]]
_latest_sample: Sample[QuantityT] | None = None

@property
def latest_sample(self) -> Sample[QuantityT] | None:
"""Return the latest fetched sample for this component."""
return self._latest_sample

@override
def evaluate(self) -> float | None:
"""Return the base value of the latest sample for this component."""
if self._latest_sample is None:
raise ValueError("Next value has not been fetched yet.")
if self._latest_sample.value is None:
return None
return self._latest_sample.value.base_value

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the telemetry stream node."""
return f"{self.source}"

async def fetch_next(self) -> None:
"""Fetch the next value for this component and store it internally."""
self._latest_sample = await anext(self.stream)


@dataclass
class FunCall(Node):
"""A function call in the formula."""

function: Function
args: list[Node]

@override
def evaluate(self) -> float | None:
"""Evaluate the function call with its arguments."""
return self.function(arg.evaluate() for arg in self.args)

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the function call node."""
args_str = ", ".join(str(arg) for arg in self.args)
return f"{self.function.name}({args_str})"


@dataclass
class Constant(Node):
"""A constant numerical value in the formula."""

value: float

@override
def evaluate(self) -> float | None:
"""Return the constant value."""
return self.value

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the constant node."""
return str(self.value)


@dataclass
class Add(Node):
"""Addition operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the addition of the left and right nodes."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left + right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the addition node."""
expr = f"{self.left} + {self.right}"
if wrap:
expr = f"({expr})"
return expr


@dataclass
class Sub(Node):
"""Subtraction operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the subtraction of the right node from the left node."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left - right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the subtraction node."""
expr = f"{self.left} - {self.right.format(True)}"
if wrap:
expr = f"({expr})"
return expr


@dataclass
class Mul(Node):
"""Multiplication operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the multiplication of the left and right nodes."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
return left * right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the multiplication node."""
return f"{self.left.format(True)} * {self.right.format(True)}"


@dataclass
class Div(Node):
"""Division operation node."""

left: Node
right: Node

@override
def evaluate(self) -> float | None:
"""Evaluate the division of the left node by the right node."""
left = self.left.evaluate()
right = self.right.evaluate()
if left is None or right is None:
return None
if is_close_to_zero(right):
return math.nan
return left / right

@override
def format(self, wrap: bool = False) -> str:
"""Return a string representation of the division node."""
return f"{self.left.format(True)} / {self.right.format(True)}"
97 changes: 97 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Function implementations for evaluating formulas."""

from __future__ import annotations

import abc
from collections.abc import Iterable

from typing_extensions import override


class Function(abc.ABC):
"""A function that can be called in a formula expression."""

@property
@abc.abstractmethod
def name(self) -> str:
"""Return the name of the function."""

@abc.abstractmethod
def __call__(self, args: Iterable[float | None]) -> float | None:
"""Call the function with the given arguments."""

@classmethod
def from_string(cls, name: str) -> Function:
"""Create a function instance from its name."""
match name.upper():
case "COALESCE":
return Coalesce()
case "MAX":
return Max()
case "MIN":
return Min()
case _:
raise ValueError(f"Unknown function name: {name}")


class Coalesce(Function):
"""A function that returns the first non-None argument."""

@property
@override
def name(self) -> str:
"""Return the name of the function."""
return "COALESCE"

@override
def __call__(self, args: Iterable[float | None]) -> float | None:
"""Return the first non-None argument."""
for arg in args:
if arg is not None:
return arg
return None


class Max(Function):
"""A function that returns the maximum of the arguments."""

@property
@override
def name(self) -> str:
"""Return the name of the function."""
return "MAX"

@override
def __call__(self, args: Iterable[float | None]) -> float | None:
"""Return the maximum of the arguments."""
max_value: float | None = None
for arg in args:
if arg is None:
return None
if max_value is None or arg > max_value:
max_value = arg
return max_value


class Min(Function):
"""A function that returns the minimum of the arguments."""

@property
@override
def name(self) -> str:
"""Return the name of the function."""
return "MIN"

@override
def __call__(self, args: Iterable[float | None]) -> float | None:
"""Return the minimum of the arguments."""
min_value: float | None = None
for arg in args:
if arg is None:
return None
if min_value is None or arg < min_value:
min_value = arg
return min_value