Skip to content
Open
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
23e0cd3
Add AST nodes for the new tree walking Formula implementation
shsms Nov 20, 2025
a67e0b3
Introduce a `Peekable` wrapper around `Iterator`
shsms Nov 20, 2025
25dfc0b
Implement a lexer for the new component graph formulas
shsms Nov 20, 2025
9366cfc
Implement a `ResampledStreamFetcher`
shsms Nov 20, 2025
c32c852
Implement a `FormulaEvaluatingActor`
shsms Nov 20, 2025
7e2ad89
Implement the `Formula` type
shsms Nov 20, 2025
56a4377
Implement a parser for string formulas
shsms Nov 20, 2025
194b742
Add tests for formulas
shsms Nov 20, 2025
0454451
Add a 3-phase formula type that wraps 3 1-phase formulas
shsms Nov 20, 2025
e23af0a
Add a formula pool for storing and reusing formulas
shsms Nov 20, 2025
465f914
Add `frequenz-microgrid-component-graph` as a dependency
shsms Nov 20, 2025
7b0fedf
Remove test for island-mode
shsms Nov 20, 2025
b37b5f5
Switch to use the external component graph
shsms Nov 20, 2025
edd8617
Delete the old component graph
shsms Nov 28, 2025
678c78a
Replace FormulaEngine with the new Formula
shsms Nov 20, 2025
a69ba9e
Remove tests for the old fallback mechanism
shsms Nov 20, 2025
0a843ea
Send test data from secondary components
shsms Nov 20, 2025
d70a647
Test priority of component powers in formulas over meter powers
shsms Nov 20, 2025
898c976
Increase number of active namespaces for formula test
shsms Nov 28, 2025
c9719e5
Drop old formula engine
shsms Nov 20, 2025
b183251
Document the new Formula implementation
shsms Nov 20, 2025
2470c10
Remove all remaining references to FormulaEngines
shsms Nov 20, 2025
5f644c8
Update release notes
shsms Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 383 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_formula.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,383 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""A composable formula represented as an AST."""

from __future__ import annotations

import logging
from collections.abc import Callable
from typing import Generic

from frequenz.channels import Broadcast, Receiver
from typing_extensions import override

from frequenz.sdk.timeseries.formulas._resampled_stream_fetcher import (
ResampledStreamFetcher,
)

from ...actor import BackgroundService
from .. import ReceiverFetcher, Sample
from .._base_types import QuantityT
from . import _ast
from ._formula_evaluator import FormulaEvaluatingActor
from ._functions import Coalesce, Max, Min

_logger = logging.getLogger(__name__)


class Formula(BackgroundService, ReceiverFetcher[Sample[QuantityT]]):
"""A formula represented as an AST."""

def __init__( # pylint: disable=too-many-arguments
self,
*,
name: str,
root: _ast.Node,
create_method: Callable[[float], QuantityT],
streams: list[_ast.TelemetryStream[QuantityT]],
sub_formulas: list[Formula[QuantityT]] | None = None,
metric_fetcher: ResampledStreamFetcher | None = None,
) -> None:
"""Create a `Formula` instance.

Args:
name: The name of the formula.
root: The root node of the formula AST.
create_method: A method to generate the output values with. If the
formula is for generating power values, this would be
`Power.from_watts`, for example.
streams: The telemetry streams that the formula depends on.
sub_formulas: Any sub-formulas that this formula depends on.
metric_fetcher: An optional metric fetcher that needs to be started
before the formula can be evaluated.
"""
BackgroundService.__init__(self)
self._name: str = name
self._root: _ast.Node = root
self._components: list[_ast.TelemetryStream[QuantityT]] = streams
self._create_method: Callable[[float], QuantityT] = create_method
self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []

self._channel: Broadcast[Sample[QuantityT]] = Broadcast(
name=f"{self}",
resend_latest=True,
)
self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor(
root=self._root,
components=self._components,
create_method=self._create_method,
output_channel=self._channel,
metric_fetcher=metric_fetcher,
)

@override
def __str__(self) -> str:
"""Return a string representation of the formula."""
return f"[{self._name}]({self._root})"

@override
def new_receiver(self, *, limit: int = 50) -> Receiver[Sample[QuantityT]]:
"""Subscribe to the formula evaluator to get evaluated samples."""
if not self._evaluator.is_running:
# raise RuntimeError(
# f"Formula evaluator for '{self._root}' is not running. Please "
# + "call `start()` on the formula before using it.",
# )
# _logger.warning(
# "Formula evaluator for '%s' is not running. Starting it. "
# + "Please call `start()` on the formula before using it."
# self._root,
# )
self.start()
return self._channel.new_receiver(limit=limit)

@override
def start(self) -> None:
"""Start the formula evaluator."""
for sub_formula in self._sub_formulas:
sub_formula.start()
self._evaluator.start()

@override
async def stop(self, msg: str | None = None) -> None:
"""Stop the formula evaluator."""
await BackgroundService.stop(self, msg)
for sub_formula in self._sub_formulas:
await sub_formula.stop(msg)
await self._evaluator.stop(msg)
Comment on lines +102 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably override cancel() + await() instead of stop().


def __add__(
self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
) -> FormulaBuilder[QuantityT]:
"""Create an addition operation node."""
return FormulaBuilder(self, self._create_method) + other

def __sub__(
self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
) -> FormulaBuilder[QuantityT]:
"""Create a subtraction operation node."""
return FormulaBuilder(self, self._create_method) - other

def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
"""Create a multiplication operation node."""
return FormulaBuilder(self, self._create_method) * other

def __truediv__(self, other: float) -> FormulaBuilder[QuantityT]:
"""Create a division operation node."""
return FormulaBuilder(self, self._create_method) / other

def coalesce(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a coalesce operation node."""
return FormulaBuilder(self, self._create_method).coalesce(other)

def min(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a min operation node."""
return FormulaBuilder(self, self._create_method).min(other)

def max(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a max operation node."""
return FormulaBuilder(self, self._create_method).max(other)


class FormulaBuilder(Generic[QuantityT]):
"""A builder for higher-order formulas represented as ASTs."""

def __init__(
self,
formula: Formula[QuantityT] | _ast.Node,
create_method: Callable[[float], QuantityT],
streams: list[_ast.TelemetryStream[QuantityT]] | None = None,
sub_formulas: list[Formula[QuantityT]] | None = None,
) -> None:
"""Create a `FormulaBuilder` instance.

Args:
formula: The initial formula to build upon.
create_method: A method to generate the output values with. If the
formula is for generating power values, this would be
`Power.from_watts`, for example.
streams: The telemetry streams that the formula depends on.
sub_formulas: Any sub-formulas that this formula depends on.
"""
self._create_method: Callable[[float], QuantityT] = create_method
self._streams: list[_ast.TelemetryStream[QuantityT]] = streams or []
"""Input streams that need to be synchronized before evaluation."""
self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []
"""Sub-formulas whose lifetimes are managed by this formula."""

if isinstance(formula, Formula):
self.root: _ast.Node = _ast.TelemetryStream(
None,
str(formula),
formula.new_receiver(),
)
self._streams.append(self.root)
self._sub_formulas.append(formula)
else:
self.root = formula

def __add__(
self,
other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]:
"""Create an addition operation node."""
if isinstance(other, FormulaBuilder):
right_node = other.root
self._streams.extend(other._streams)
elif isinstance(other, Formula):
right_node = _ast.TelemetryStream(None, str(other), other.new_receiver())
self._streams.append(right_node)
self._sub_formulas.append(other)
else:
right_node = _ast.Constant(None, other.base_value)

new_root = _ast.Add(None, self.root, right_node)
return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def __sub__(
self,
other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
) -> FormulaBuilder[QuantityT]:
"""Create a subtraction operation node."""
if isinstance(other, FormulaBuilder):
right_node = other.root
self._streams.extend(other._streams)
elif isinstance(other, Formula):
right_node = _ast.TelemetryStream(None, str(other), other.new_receiver())
self._streams.append(right_node)
self._sub_formulas.append(other)
else:
right_node = _ast.Constant(None, other.base_value)

new_root = _ast.Sub(None, self.root, right_node)
return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
"""Create a multiplication operation node."""
right_node = _ast.Constant(None, other)
new_root = _ast.Mul(None, self.root, right_node)
return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def __truediv__(
self,
other: float,
) -> FormulaBuilder[QuantityT]:
"""Create a division operation node."""
right_node = _ast.Constant(None, other)
new_root = _ast.Div(None, self.root, right_node)
return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def coalesce(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a coalesce operation node."""
right_nodes: list[_ast.Node] = []
for item in other:
if isinstance(item, FormulaBuilder):
right_nodes.append(item.root)
self._streams.extend(item._streams) # pylint: disable=protected-access
elif isinstance(item, Formula):
right_node = _ast.TelemetryStream(
None,
str(item),
item.new_receiver(),
)
right_nodes.append(right_node)
self._streams.append(right_node)
self._sub_formulas.append(item)
else:
right_nodes.append(_ast.Constant(None, item.base_value))

new_root = _ast.FunCall(
None,
Coalesce(),
[self.root] + right_nodes,
)

return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def min(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a min operation node."""
right_nodes: list[_ast.Node] = []
for item in other:
if isinstance(item, FormulaBuilder):
right_nodes.append(item.root)
self._streams.extend(item._streams) # pylint: disable=protected-access
elif isinstance(item, Formula):
right_node = _ast.TelemetryStream(
None,
str(item),
item.new_receiver(),
)
right_nodes.append(right_node)
self._streams.append(right_node)
self._sub_formulas.append(item)
else:
right_nodes.append(_ast.Constant(None, item.base_value))

new_root = _ast.FunCall(
None,
Min(),
[self.root] + right_nodes,
)

return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def max(
self,
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
) -> FormulaBuilder[QuantityT]:
"""Create a max operation node."""
right_nodes: list[_ast.Node] = []
for item in other:
if isinstance(item, FormulaBuilder):
right_nodes.append(item.root)
self._streams.extend(item._streams) # pylint: disable=protected-access
elif isinstance(item, Formula):
right_node = _ast.TelemetryStream(
None,
str(item),
item.new_receiver(),
)
right_nodes.append(right_node)
self._streams.append(right_node)
self._sub_formulas.append(item)
else:
right_nodes.append(_ast.Constant(None, item.base_value))

new_root = _ast.FunCall(
None,
Max(),
[self.root] + right_nodes,
)

return FormulaBuilder(
new_root,
self._create_method,
self._streams,
self._sub_formulas,
)

def build(
self,
name: str,
) -> Formula[QuantityT]:
"""Build a `Formula` instance.

Args:
name: The name of the formula.

Returns:
A `Formula` instance.
"""
return Formula(
name=name,
root=self.root,
create_method=self._create_method,
streams=self._streams,
sub_formulas=self._sub_formulas,
)