diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index dd74f73..4fe06aa 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -18,5 +18,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
 
 ## New Features
 
+* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
+
 * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
 
diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py
index fe6a504..f239803 100644
--- a/src/frequenz/dispatch/__init__.py
+++ b/src/frequenz/dispatch/__init__.py
@@ -16,9 +16,11 @@
 """
 
 from ._actor_dispatcher import ActorDispatcher, DispatchInfo
+from ._bg_service import MergeStrategy
 from ._dispatch import Dispatch
 from ._dispatcher import Dispatcher
 from ._event import Created, Deleted, DispatchEvent, Updated
+from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget
 
 __all__ = [
     "Created",
@@ -29,4 +31,8 @@
     "Dispatch",
     "ActorDispatcher",
     "DispatchInfo",
+    "MergeStrategy",
+    "MergeByIdentity",
+    "MergeByType",
+    "MergeByTypeTarget",
 ]
diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py
index 9e466a6..c54c204 100644
--- a/src/frequenz/dispatch/_bg_service.py
+++ b/src/frequenz/dispatch/_bg_service.py
@@ -3,8 +3,13 @@
 
 """The dispatch background service."""
 
+from __future__ import annotations
+
 import asyncio
+import functools
 import logging
+from abc import ABC, abstractmethod
+from collections.abc import Mapping
 from dataclasses import dataclass, field
 from datetime import datetime, timedelta, timezone
 from heapq import heappop, heappush
@@ -23,6 +28,22 @@
 """The logger for this module."""
 
 
+class MergeStrategy(ABC):
+    """Base class for strategies to merge running intervals."""
+
+    @abstractmethod
+    def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
+        """Filter dispatches based on the strategy.
+
+        Args:
+            dispatches: All dispatches, available as context.
+            dispatch: The dispatch to filter.
+
+        Returns:
+            True if the dispatch should be included, False otherwise.
+        """
+
+
 # pylint: disable=too-many-instance-attributes
 class DispatchScheduler(BackgroundService):
     """Dispatch background service.
@@ -119,19 +140,36 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
         )
 
     async def new_running_state_event_receiver(
-        self, type: str, *, unify_running_intervals: bool = True
+        self, type: str, *, merge_strategy: MergeStrategy | None = None
     ) -> Receiver[Dispatch]:
         """Create a new receiver for running state events of the specified type.
 
-        If `unify_running_intervals` is True, running intervals from multiple
-        dispatches of the same type are considered as one continuous running
-        period. In this mode, any stop events are ignored as long as at least
-        one dispatch remains active.
+        `merge_strategy` is an instance of a class derived from
+        [`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies
+        are:
+
+        * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
+          of the same type
+        * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
+          dispatches of the same type and target
+        * `None` — no merging, just send all events
+
+        You can make your own strategy by subclassing:
+
+        * [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
+          dispatches based on a user defined identity function
+        * [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
+          on a user defined filter function
+
+        Running intervals from multiple dispatches will be merged, according to
+        the chosen strategy.
+
+        While merging, stop events are ignored as long as at least one
+        merge-criteria-matching dispatch remains active.
 
         Args:
             type: The type of events to receive.
-            unify_running_intervals: Whether to unify running intervals.
-
+            merge_strategy: The merge strategy to use.
         Returns:
             A new receiver for running state status.
         """
@@ -140,33 +178,21 @@ async def new_running_state_event_receiver(
             dispatch for dispatch in self._dispatches.values() if dispatch.type == type
         ]
 
-        # Create receiver with enough capacity to hold all matching dispatches
+        # Create a new receiver with at least 30 slots, but more if there are
+        # more dispatches.
+        # That way we can send all dispatches initially and don't have to worry
+        # about the receiver being full.
+        # If there are no initial dispatches, we still want to have some slots
+        # available for future dispatches, so we set the limit to 30.
         receiver = self._running_state_status_channel.new_receiver(
-            limit=max(1, len(dispatches))
+            limit=max(30, len(dispatches))
         ).filter(lambda dispatch: dispatch.type == type)
 
-        if unify_running_intervals:
-
-            def _is_type_still_running(new_dispatch: Dispatch) -> bool:
-                """Merge time windows of running dispatches.
-
-                Any event that would cause a stop is filtered if at least one
-                dispatch of the same type is running.
-                """
-                if new_dispatch.started:
-                    return True
-
-                other_dispatches_running = any(
-                    dispatch.started
-                    for dispatch in self._dispatches.values()
-                    if dispatch.type == type
-                )
-                # If no other dispatches are running, we can allow the stop event
-                return not other_dispatches_running
-
-            receiver = receiver.filter(_is_type_still_running)
+        if merge_strategy:
+            receiver = receiver.filter(
+                functools.partial(merge_strategy.filter, self._dispatches)
+            )
 
-        # Send all matching dispatches to the receiver
         for dispatch in dispatches:
             await self._send_running_state_change(dispatch)
 
@@ -195,9 +221,6 @@ async def _run(self) -> None:
             if selected_from(selected, self._next_event_timer):
                 if not self._scheduled_events:
                     continue
-                _logger.debug(
-                    "Executing scheduled event: %s", self._scheduled_events[0].dispatch
-                )
                 await self._execute_scheduled_event(
                     heappop(self._scheduled_events).dispatch
                 )
@@ -227,6 +250,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
         Args:
             dispatch: The dispatch to execute.
         """
+        _logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started)
         await self._send_running_state_change(dispatch)
 
         # The timer is always a tiny bit delayed, so we need to check if the
@@ -256,7 +280,7 @@ async def _fetch(self) -> None:
                 for client_dispatch in page:
                     dispatch = Dispatch(client_dispatch)
 
-                    self._dispatches[dispatch.id] = Dispatch(client_dispatch)
+                    self._dispatches[dispatch.id] = dispatch
                     old_dispatch = old_dispatches.pop(dispatch.id, None)
                     if not old_dispatch:
                         _logger.debug("New dispatch: %s", dispatch)
@@ -310,7 +334,7 @@ async def _update_dispatch_schedule_and_notify(
             self._remove_scheduled(old_dispatch)
 
             was_running = old_dispatch.started
-            old_dispatch._set_deleted()  # pylint: disable=protected-access)
+            old_dispatch._set_deleted()  # pylint: disable=protected-access
 
             # If the dispatch was running, we need to notify
             if was_running:
diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py
index 512318d..f2eca19 100644
--- a/src/frequenz/dispatch/_dispatcher.py
+++ b/src/frequenz/dispatch/_dispatcher.py
@@ -7,7 +7,7 @@
 from frequenz.channels import Receiver
 from frequenz.client.dispatch import Client
 
-from ._bg_service import DispatchScheduler
+from ._bg_service import DispatchScheduler, MergeStrategy
 from ._dispatch import Dispatch
 from ._event import DispatchEvent
 
@@ -16,17 +16,19 @@ class Dispatcher:
     """A highlevel interface for the dispatch API.
 
     This class provides a highlevel interface to the dispatch API.
-    It provides two channels:
+    It provides two receiver functions:
 
-    Lifecycle events:
-        A channel that sends a dispatch event message whenever a dispatch
-        is created, updated or deleted.
+    * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
+        Receives an event whenever a dispatch is created, updated or deleted.
+    * [Running status change
+        receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]:
+        Receives an event whenever the running status of a dispatch changes.
+        The running status of a dispatch can change due to a variety of reasons,
+        such as but not limited to the dispatch being started, stopped, modified
+        or deleted or reaching its scheduled start or end time.
 
-    Running status change:
-        Sends a dispatch message whenever a dispatch is ready
-        to be executed according to the schedule or the running status of the
-        dispatch changed in a way that could potentially require the consumer to start,
-        stop or reconfigure itself.
+        Any change that could potentially require the consumer to start, stop or
+        reconfigure itself will cause a message to be sent.
 
     Example: Processing running state change dispatches
         ```python
@@ -200,7 +202,10 @@ def new_lifecycle_events_receiver(
         return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
 
     async def new_running_state_event_receiver(
-        self, dispatch_type: str, *, unify_running_intervals: bool = True
+        self,
+        dispatch_type: str,
+        *,
+        merge_strategy: MergeStrategy | None = None,
     ) -> Receiver[Dispatch]:
         """Return running state event receiver.
 
@@ -228,18 +233,29 @@ async def new_running_state_event_receiver(
          - The payload changed
          - The dispatch was deleted
 
-        If `unify_running_intervals` is True, running intervals from multiple
-        dispatches of the same type are considered as one continuous running
-        period. In this mode, any stop events are ignored as long as at least
-        one dispatch remains active.
+        `merge_strategy` is an instance of a class derived from
+        [`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
+        are:
+
+        * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
+          of the same type
+        * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
+          dispatches of the same type and target
+        * `None` — no merging, just send all events (default)
+
+        Running intervals from multiple dispatches will be merged, according to
+        the chosen strategy.
+
+        While merging, stop events are ignored as long as at least one
+        merge-criteria-matching dispatch remains active.
 
         Args:
             dispatch_type: The type of the dispatch to listen for.
-            unify_running_intervals: Whether to unify running intervals.
+            merge_strategy: The type of the strategy to merge running intervals.
 
         Returns:
             A new receiver for dispatches whose running status changed.
         """
         return await self._bg_service.new_running_state_event_receiver(
-            dispatch_type, unify_running_intervals=unify_running_intervals
+            dispatch_type, merge_strategy=merge_strategy
         )
diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py
new file mode 100644
index 0000000..3753546
--- /dev/null
+++ b/src/frequenz/dispatch/_merge_strategies.py
@@ -0,0 +1,67 @@
+# License: MIT
+# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
+
+"""Different merge strategies for dispatch running state events."""
+
+import logging
+from abc import abstractmethod
+from collections.abc import Mapping
+
+from typing_extensions import override
+
+from ._bg_service import MergeStrategy
+from ._dispatch import Dispatch
+
+
+class MergeByIdentity(MergeStrategy):
+    """Merge running intervals based on a dispatch configuration."""
+
+    @abstractmethod
+    def identity(self, dispatch: Dispatch) -> int:
+        """Identity function for the merge criteria."""
+
+    @override
+    def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
+        """Filter dispatches based on the merge strategy.
+
+        Keeps start events.
+        Keeps stop events only if no other dispatches matching the
+        strategy's criteria are running.
+        """
+        if dispatch.started:
+            logging.debug("Keeping start event %s", dispatch.id)
+            return True
+
+        other_dispatches_running = any(
+            existing_dispatch.started
+            for existing_dispatch in dispatches.values()
+            if (
+                self.identity(existing_dispatch) == self.identity(dispatch)
+                and existing_dispatch.id != dispatch.id
+            )
+        )
+
+        logging.debug(
+            "stop event %s because other_dispatches_running=%s",
+            dispatch.id,
+            other_dispatches_running,
+        )
+        return not other_dispatches_running
+
+
+class MergeByType(MergeByIdentity):
+    """Merge running intervals based on the dispatch type."""
+
+    @override
+    def identity(self, dispatch: Dispatch) -> int:
+        """Identity function for the merge criteria."""
+        return hash(dispatch.type)
+
+
+class MergeByTypeTarget(MergeByType):
+    """Merge running intervals based on the dispatch type and target."""
+
+    @override
+    def identity(self, dispatch: Dispatch) -> int:
+        """Identity function for the merge criteria."""
+        return hash((dispatch.type, dispatch.target))
diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py
index 82802c1..0145bee 100644
--- a/tests/test_frequenz_dispatch.py
+++ b/tests/test_frequenz_dispatch.py
@@ -10,6 +10,7 @@
 from typing import AsyncIterator, Iterator
 
 import async_solipsism
+import pytest
 import time_machine
 from frequenz.channels import Receiver
 from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
@@ -18,7 +19,16 @@
 from frequenz.client.dispatch.types import Dispatch as BaseDispatch
 from pytest import fixture
 
-from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated
+from frequenz.dispatch import (
+    Created,
+    Deleted,
+    Dispatch,
+    DispatchEvent,
+    MergeByType,
+    MergeByTypeTarget,
+    MergeStrategy,
+    Updated,
+)
 from frequenz.dispatch._bg_service import DispatchScheduler
 
 
@@ -77,7 +87,7 @@ async def test_env() -> AsyncIterator[TestEnv]:
             service=service,
             lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"),
             running_state_change=await service.new_running_state_event_receiver(
-                "TEST_TYPE", unify_running_intervals=False
+                "TEST_TYPE", merge_strategy=MergeByType()
             ),
             client=client,
             microgrid_id=microgrid_id,
@@ -446,7 +456,7 @@ async def test_dispatch_new_but_finished(
         lifecycle_events=test_env.service.new_lifecycle_events_receiver("TEST_TYPE"),
         running_state_change=(
             await test_env.service.new_running_state_event_receiver(
-                "TEST_TYPE", unify_running_intervals=False
+                "TEST_TYPE", merge_strategy=MergeByType()
             )
         ),
     )
@@ -520,9 +530,11 @@ async def test_notification_on_actor_start(
     assert ready_dispatch.started
 
 
-async def test_multiple_dispatches_unify_running_intervals(
+@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()])
+async def test_multiple_dispatches_merge_running_intervals(
     fake_time: time_machine.Coordinates,
     generator: DispatchGenerator,
+    merge_strategy: MergeStrategy,
 ) -> None:
     """Test that multiple dispatches are merged into a single running interval."""
     microgrid_id = randint(1, 100)
@@ -534,7 +546,7 @@ async def test_multiple_dispatches_unify_running_intervals(
     service.start()
 
     receiver = await service.new_running_state_event_receiver(
-        "TEST_TYPE", unify_running_intervals=True
+        "TEST_TYPE", merge_strategy=merge_strategy
     )
 
     # Create two overlapping dispatches
@@ -542,6 +554,7 @@ async def test_multiple_dispatches_unify_running_intervals(
         generator.generate_dispatch(),
         active=True,
         duration=timedelta(seconds=30),
+        target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4],
         start_time=_now() + timedelta(seconds=5),
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -550,6 +563,7 @@ async def test_multiple_dispatches_unify_running_intervals(
         generator.generate_dispatch(),
         active=True,
         duration=timedelta(seconds=10),
+        target=[3, 4],
         start_time=_now() + timedelta(seconds=10),  # starts after dispatch1
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -573,7 +587,7 @@ async def test_multiple_dispatches_unify_running_intervals(
     assert started1.started
     assert started2.started
 
-    # Stop dispatch2 first, but unify_running_intervals=True means as long as dispatch1 runs,
+    # Stop dispatch2 first, but merge_running_intervals=TYPE means as long as dispatch1 runs,
     # we do not send a stop event
     await client.update(
         microgrid_id=microgrid_id, dispatch_id=started2.id, new_fields={"active": False}
@@ -592,14 +606,16 @@ async def test_multiple_dispatches_unify_running_intervals(
     await service.stop()
 
 
-async def test_multiple_dispatches_sequential_intervals_unify(
+@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()])
+async def test_multiple_dispatches_sequential_intervals_merge(
     fake_time: time_machine.Coordinates,
     generator: DispatchGenerator,
+    merge_strategy: MergeStrategy,
 ) -> None:
     """Test that multiple dispatches are merged into a single running interval.
 
     Even if dispatches don't overlap but are consecutive,
-    unify_running_intervals=True should treat them as continuous if any event tries to stop.
+    merge_running_intervals=TPYE should treat them as continuous if any event tries to stop.
     """
     microgrid_id = randint(1, 100)
     client = FakeClient()
@@ -607,13 +623,15 @@ async def test_multiple_dispatches_sequential_intervals_unify(
     service.start()
 
     receiver = await service.new_running_state_event_receiver(
-        "TEST_TYPE", unify_running_intervals=True
+        "TEST_TYPE", merge_strategy=merge_strategy
     )
 
     dispatch1 = replace(
         generator.generate_dispatch(),
         active=True,
         duration=timedelta(seconds=5),
+        # If merging by type, we want to test having different targets in dispatch 1 and 2
+        target=[3, 4] if isinstance(merge_strategy, MergeByType) else [1, 2],
         start_time=_now() + timedelta(seconds=5),
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -623,6 +641,7 @@ async def test_multiple_dispatches_sequential_intervals_unify(
         generator.generate_dispatch(),
         active=True,
         duration=timedelta(seconds=5),
+        target=[1, 2],
         start_time=dispatch1.start_time + dispatch1.duration,
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -636,30 +655,30 @@ async def test_multiple_dispatches_sequential_intervals_unify(
     await lifecycle.receive()
     await lifecycle.receive()
 
-    fake_time.shift(timedelta(seconds=11))
+    fake_time.move_to(dispatch1.start_time + timedelta(seconds=1))
     await asyncio.sleep(1)
     started1 = await receiver.receive()
     assert started1.started
 
     # Wait for the second dispatch to start
-    fake_time.shift(timedelta(seconds=3))
+    fake_time.move_to(dispatch2.start_time + timedelta(seconds=1))
     await asyncio.sleep(1)
     started2 = await receiver.receive()
     assert started2.started
+    assert started2.target == dispatch2.target
 
     # Now stop the second dispatch
-    fake_time.shift(timedelta(seconds=5))
-    await asyncio.sleep(1)
+    assert dispatch2.duration is not None
+    fake_time.move_to(dispatch2.start_time + dispatch2.duration + timedelta(seconds=1))
     stopped = await receiver.receive()
     assert not stopped.started
 
-    await service.stop()
-    await asyncio.sleep(1)
-
 
+@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()])
 async def test_at_least_one_running_filter(
     fake_time: time_machine.Coordinates,
     generator: DispatchGenerator,
+    merge_strategy: MergeStrategy,
 ) -> None:
     """Test scenarios directly tied to the _at_least_one_running logic."""
     microgrid_id = randint(1, 100)
@@ -667,9 +686,9 @@ async def test_at_least_one_running_filter(
     service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
     service.start()
 
-    # unify_running_intervals is True, so we use merged intervals
+    # merge_running_intervals is TYPE, so we use merged intervals
     receiver = await service.new_running_state_event_receiver(
-        "TEST_TYPE", unify_running_intervals=True
+        "TEST_TYPE", merge_strategy=merge_strategy
     )
 
     # Single dispatch that starts and stops normally
@@ -677,6 +696,7 @@ async def test_at_least_one_running_filter(
         generator.generate_dispatch(),
         active=True,
         duration=timedelta(seconds=10),
+        target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4],
         start_time=_now() + timedelta(seconds=5),
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -705,6 +725,7 @@ async def test_at_least_one_running_filter(
         generator.generate_dispatch(),
         active=False,
         duration=timedelta(seconds=10),
+        target=[3, 4],
         start_time=_now() + timedelta(seconds=50),
         recurrence=RecurrenceRule(),
         type="TEST_TYPE",
@@ -738,6 +759,3 @@ async def test_at_least_one_running_filter(
     await asyncio.sleep(1)
     stopped_b = await receiver.receive()
     assert not stopped_b.started
-
-    # Since dispatch_a never started, no merging logic needed here.
-    await service.stop()