Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
public interface ClusterMessagingService {
/**
* Send message matching the specifications mentioned in recipientCriteria.
*
* <p><b>PERFORMANCE WARNING:</b> When recipientCriteria uses {@link DataSource#EXTERNALVIEW}
* with wildcard or unspecified resource names, this scans <b>ALL</b> ExternalView znodes in the cluster,
* regardless of other criteria like instanceName. At scale, this causes
* severe performance degradation. Use {@link DataSource#LIVEINSTANCES} when you don't need
* resource/partition filtering, or specify exact resource names when using EXTERNALVIEW.
*
* @param recipientCriteria criteria to be met, defined as {@link Criteria}
* @See Criteria
* @param message
Expand All @@ -54,6 +61,7 @@ public interface ClusterMessagingService {
* This method will return after sending the messages. <br>
* This is useful when message need to be sent and current thread need not
* wait for response since processing will be done in another thread.
*
* @see #send(Criteria, Message)
* @param recipientCriteria
* @param message
Expand Down Expand Up @@ -85,7 +93,8 @@ int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnRe
* for response. <br>
* The current thread can use callbackOnReply instance to store application
* specific data.
* @see #send(Criteria, Message, AsyncCallback, int)
*
* @see #send(Criteria, Message)
* @param recipientCriteria
* @param message
* @param callbackOnReply
Expand All @@ -96,7 +105,7 @@ int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback callb
int timeOut);

/**
* @see #send(Criteria, Message, AsyncCallback, int, int)
* @see #send(Criteria, Message)
* @param receipientCriteria
* @param message
* @param callbackOnReply
Expand Down Expand Up @@ -143,6 +152,8 @@ int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback call
/**
* This will generate all messages to be sent given the recipientCriteria and MessageTemplate,
* the messages are not sent.
*
* @see #send(Criteria, Message)
* @param recipientCriteria criteria to be met, defined as {@link Criteria}
* @param messageTemplate the Message on which to base the messages to send
* @return messages to be sent, grouped by the type of instance to send the message to
Expand Down
52 changes: 47 additions & 5 deletions helix-core/src/main/java/org/apache/helix/Criteria.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,43 @@
*/

/**
* Describes various properties that operations involving {@link Message} delivery will follow.
* Specifies recipient criteria for message delivery in a Helix cluster.
*
* <p><b>PERFORMANCE WARNING:</b> Using {@link DataSource#EXTERNALVIEW} with wildcard or unspecified
* resource names causes Helix to scan ALL ExternalView znodes in the cluster, regardless of other
* criteria fields. At scale, this causes severe performance degradation.
*
* <p><b>Example - Efficient Pattern:</b>
* <pre>
* // GOOD: Target specific live instance
* Criteria criteria = new Criteria();
* criteria.setInstanceName("host_1234");
* criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
* criteria.setDataSource(DataSource.LIVEINSTANCES); // Fast
* criteria.setSessionSpecific(true);
*
* // BAD: Wildcard resource with ExternalView
* Criteria criteria = new Criteria();
* criteria.setInstanceName("host_1234");
* criteria.setDataSource(DataSource.EXTERNALVIEW);
* criteria.setResource("%"); // Scans ALL ExternalViews!
* </pre>
*
* <p><b>DataSource Selection:</b>
* <ul>
* <li><b>LIVEINSTANCES:</b> Use when targeting live instances without resource/partition filtering. Fastest.</li>
* <li><b>EXTERNALVIEW:</b> Use when filtering by resource, partition, or replica state.
* ALWAYS specify exact resource names.</li>
* <li><b>INSTANCES:</b> Use for targeting all configured instances based on instance config.</li>
* <li><b>IDEALSTATES:</b> Use for targeting based on ideal state. Less common.</li>
* </ul>
*
* @see ClusterMessagingService#send(Criteria, org.apache.helix.model.Message)
*/
public class Criteria {
/**
* Source of cluster state data for resolving message recipients.
*/
public enum DataSource {
IDEALSTATES,
EXTERNALVIEW,
Expand Down Expand Up @@ -80,8 +114,12 @@ public DataSource getDataSource() {
}

/**
* Set the current source of truth
* @param source ideal state or external view
* Set the current source of truth for resolving message recipients.
*
* <p>Prefer {@link DataSource#LIVEINSTANCES} when not filtering by resource/partition.
* If using {@link DataSource#EXTERNALVIEW}, specify exact resource names to avoid full scans.
*
* @param source ideal state, external view, live instances, or instances
*/
public void setDataSource(DataSource source) {
_dataSource = source;
Expand Down Expand Up @@ -161,8 +199,12 @@ public String getResource() {
}

/**
* Set the destination resource name
* @param resourceName the resource name or % for all resources
* Set the destination resource name.
*
* <p>Only meaningful for {@link DataSource#EXTERNALVIEW} or {@link DataSource#IDEALSTATES}.
* Using wildcard "%" with EXTERNALVIEW reads ALL ExternalView znodes - use exact names instead.
*
* @param resourceName the exact resource name, or "%" for all resources
*/
public void setResource(String resourceName) {
this.resourceName = resourceName;
Expand Down
2 changes: 2 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ void addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l

/**
* Messaging service which can be used to send cluster wide messages.
* See {@link ClusterMessagingService#send(Criteria, org.apache.helix.model.Message)} for usage.
*
* @return messaging service
*/
ClusterMessagingService getMessagingService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,51 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Evaluates {@link Criteria} against persisted Helix data to determine message recipients.
*
* <p><b>PERFORMANCE WARNING:</b> When using {@link DataSource#EXTERNALVIEW}, this evaluator
* will scan <b>all</b> ExternalView znodes in the cluster if the resource name is unspecified or uses wildcards
* (e.g., "%" or "*"). This scanning happens <b>even when targeting specific instances</b>, and is
* NOT automatically optimized based on other criteria fields (like instanceName).
*
* <p>At high ExternalView cardinality, this can cause severe performance degradation.
*
* <p><b>Safer Patterns:</b>
* <ul>
* <li><b>Use {@link DataSource#LIVEINSTANCES}:</b> When you only need to target live instances
* and do not require resource/partition-level filtering. This reads only the LIVEINSTANCES
* znodes, which is typically much smaller and faster.</li>
* <li><b>Specify exact resource names:</b> If ExternalView is required, provide specific resource
* names in {@link Criteria#setResource(String)} instead of wildcards to limit the scan scope.</li>
* </ul>
*
* <p><b>Example - Targeting a specific instance:</b>
* <pre>
* // BAD: Scans all ExternalViews even though instance is specified
* Criteria criteria = new Criteria();
* criteria.setInstanceName("instance123");
* criteria.setDataSource(DataSource.EXTERNALVIEW);
* criteria.setResource("%"); // wildcard triggers full scan
*
* // GOOD: Uses LIVEINSTANCES, avoids ExternalView scan
* Criteria criteria = new Criteria();
* criteria.setInstanceName("instance123");
* criteria.setDataSource(DataSource.LIVEINSTANCES);
* </pre>
*/
public class CriteriaEvaluator {
private static Logger logger = LoggerFactory.getLogger(CriteriaEvaluator.class);
public static final String MATCH_ALL_SYM = "%";

/**
* Examine persisted data to match wildcards in {@link Criteria}
*
* <p><b>PERFORMANCE WARNING:</b> Using {@link DataSource#EXTERNALVIEW} with wildcard resource
* names (or unspecified resource) will scan ALL ExternalView znodes, even when targeting specific
* instances. At high cardinality, this can cause severe performance degradation. Prefer
* {@link DataSource#LIVEINSTANCES} when resource/partition filtering is not needed.
*
* @param recipientCriteria Criteria specifying the message destinations
* @param manager connection to the persisted data
* @return map of evaluated criteria
Expand All @@ -56,6 +95,12 @@ public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,

/**
* Examine persisted data to match wildcards in {@link Criteria}
*
* <p><b>PERFORMANCE WARNING:</b> Using {@link DataSource#EXTERNALVIEW} with wildcard resource
* names (or unspecified resource) will scan ALL ExternalView znodes, even when targeting specific
* instances. At high cardinality, this can cause severe performance degradation. Prefer
* {@link DataSource#LIVEINSTANCES} when resource/partition filtering is not needed.
*
* @param recipientCriteria Criteria specifying the message destinations
* @param accessor connection to the persisted data
* @return map of evaluated criteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/

/**
* Helix message handling classes
*
* Helix message handling classes.
*
* <p>When using the messaging API, configure {@link org.apache.helix.Criteria} carefully
* to avoid performance issues. See {@link org.apache.helix.ClusterMessagingService}.
*/
package org.apache.helix.messaging;
148 changes: 125 additions & 23 deletions website/0.9.9/src/site/markdown/Features.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,32 +220,134 @@ Since Helix is aware of the global state of the system, it can send the message
This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc.
System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes.

#### Understanding Criteria and DataSource

The `Criteria` object allows you to specify message recipients using various attributes. A critical configuration is the `DataSource`, which determines where Helix looks up the cluster state to resolve your criteria.

**Available DataSource Options:**

Helix provides four DataSource types, each reading from different znodes in ZooKeeper:

| DataSource | Description | When to Use |
|------------|-------------|-------------|
| **LIVEINSTANCES** | Reads from `/LIVEINSTANCES` znodes | Targeting live instances without needing resource/partition/state filtering |
| **INSTANCES** | Reads from `/INSTANCES/[instance]` znodes | Targeting specific configured instances (live or not) based on instance configuration |
| **EXTERNALVIEW** | Reads from `/EXTERNALVIEWS/[resource]` znodes | Targeting based on actual replica placement, partition ownership, or replica state (MASTER/SLAVE) |
| **IDEALSTATES** | Reads from `/IDEALSTATES/[resource]` znodes | Targeting based on ideal state configuration (intended placement) |

**Key Differences:**

- **LIVEINSTANCES**: Contains only instance names of currently connected participants. No resource/partition information. Smallest dataset.
- **INSTANCES**: Contains instance configuration (host, port, enabled/disabled status). No resource/partition information.
- **EXTERNALVIEW**: Contains actual current state - which instances own which partitions and their states (MASTER/SLAVE/OFFLINE). Large dataset at scale.
- **IDEALSTATES**: Contains desired state - which instances should own which partitions. Similar size to ExternalView.

**Choosing the Right DataSource:**

| Your Goal | Correct DataSource | Example Use Case |
|-----------|-------------------|------------------|
| Send to specific live instance(s) | `LIVEINSTANCES` | Health check, admin command to specific node |
| Send to all live instances | `LIVEINSTANCES` | Broadcast announcement, cluster-wide operation |
| Send to replicas of a specific partition | `EXTERNALVIEW` (with exact resource name) | Bootstrap replica from peers |
| Send to all MASTER replicas of a resource | `EXTERNALVIEW` (with exact resource name) | Trigger operation on masters only |
| Send based on partition state | `EXTERNALVIEW` (with exact resource name) | Target only ONLINE/MASTER/SLAVE replicas |

#### CRITICAL: Performance Considerations

**⚠️ WARNING:** Using `EXTERNALVIEW` as the DataSource can cause severe performance issues at scale.

**The Problem:**
When using `DataSource.EXTERNALVIEW`, Helix will scan **ALL** ExternalView znodes in the cluster if:
- You use wildcards (`%` or `*`) in the resource name, OR
- You leave the resource name unspecified

**This happens even when targeting specific instances!** The scan is NOT automatically optimized based on other criteria fields like `instanceName`.

At high ExternalView cardinality, this can cause severe performance degradation.

#### How to Set Criteria Correctly

**Pattern 1: Targeting Specific Instances (Most Common)**

When you only need to send messages to specific instances and don't need resource/partition-level filtering:

```java
// GOOD: Efficient - Uses LIVEINSTANCES, avoids ExternalView scan
Criteria criteria = new Criteria();
criteria.setInstanceName("instance123"); // or "%" for all live instances
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setDataSource(DataSource.LIVEINSTANCES); // Key: Use LIVEINSTANCES
criteria.setSessionSpecific(true);
```

```java
// BAD: Inefficient - Scans ALL ExternalViews even though targeting specific instance
Criteria criteria = new Criteria();
criteria.setInstanceName("instance123");
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setDataSource(DataSource.EXTERNALVIEW); // Will scan ALL resources!
criteria.setResource("%"); // Wildcard triggers full scan
```

**Pattern 2: Targeting Specific Resource and Partition**

When you need to send messages based on resource ownership (e.g., all replicas of a partition):

```java
// GOOD: Efficient - Specifies exact resource name, scans only 1 ExternalView
Criteria criteria = new Criteria();
criteria.setInstanceName("%");
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setDataSource(DataSource.EXTERNALVIEW);
criteria.setResource("MyDB"); // Exact resource name - scans only this EV
criteria.setPartition("MyDB_0"); // Specific partition
criteria.setPartitionState("MASTER"); // Only send to masters
criteria.setSessionSpecific(true);
```

```java
// BAD: Inefficient - Wildcard resource scans ALL ExternalViews
Criteria criteria = new Criteria();
criteria.setInstanceName("%");
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setDataSource(DataSource.EXTERNALVIEW);
criteria.setResource("%"); // Wildcard scans ALL ExternalViews in cluster!
criteria.setPartition("MyDB_0");
criteria.setSessionSpecific(true);
```
ClusterMessagingService messagingService = manager.getMessagingService();
//CONSTRUCT THE MESSAGE
Message requestBackupUriRequest = new Message(
MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
requestBackupUriRequest
.setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
requestBackupUriRequest.setMsgState(MessageState.NEW);
//SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource("MyDB");
recipientCriteria.setPartition("");
//Should be processed only the process that is active at the time of sending the message.
//This means if the recipient is restarted after message is sent, it will not be processed.
recipientCriteria.setSessionSpecific(true);
// wait for 30 seconds
int timeout = 30000;
//The handler that will be invoked when any recipient responds to the message.
BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
//This will return only after all recipients respond or after timeout.
int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
requestBackupUriRequest, responseHandler, timeout);

**Pattern 3: Broadcasting to All Live Instances**

```java
// GOOD: Efficient broadcast to all live participants
Criteria criteria = new Criteria();
criteria.setInstanceName("%");
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setDataSource(DataSource.LIVEINSTANCES); // Fast broadcast
criteria.setSessionSpecific(true);
```

#### Criteria Configuration Reference

The `Criteria` class provides the following configuration methods:

| Method | Parameter | Description | Wildcard Support | Example |
|--------|-----------|-------------|------------------|---------|
| `setDataSource(DataSource)` | LIVEINSTANCES, INSTANCES, EXTERNALVIEW, IDEALSTATES | **MOST IMPORTANT:** Determines which znodes to read | N/A | `DataSource.LIVEINSTANCES` |
| `setInstanceName(String)` | Instance name | Target specific instance(s) by name | Yes (`%` = all) | `"localhost_12918"` or `"%"` |
| `setResource(String)` | Resource name | Filter by resource name (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase"` or `"%"` |
| `setPartition(String)` | Partition name | Filter by specific partition (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase_0"` or `"%"` |
| `setPartitionState(String)` | State name | Filter by replica state like MASTER, SLAVE, ONLINE, OFFLINE (only for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MASTER"` or `"%"` |
| `setRecipientInstanceType(InstanceType)` | PARTICIPANT, CONTROLLER, SPECTATOR | Type of Helix process to target | No | `InstanceType.PARTICIPANT` |
| `setSessionSpecific(boolean)` | true/false | If true, message is only delivered to currently active sessions (not redelivered after restart) | No | `true` (recommended) |

**Important Notes:**

- **Wildcards:** Use `%` (SQL-style) or `*` to match all. Single underscore `_` matches any single character.
- **DataSource Compatibility:** Setting `resource`, `partition`, or `partitionState` only makes sense with `EXTERNALVIEW` or `IDEALSTATES` DataSource. They are ignored for `LIVEINSTANCES` and `INSTANCES`.
- **Session-Specific:** Set to `true` for most use cases to avoid redelivering messages after a participant restarts.
- **Empty vs Wildcard:** Empty string `""` and wildcard `"%"` are treated the same - both match all.

See HelixManager.getMessagingService for more info.


Expand Down
20 changes: 20 additions & 0 deletions website/0.9.9/src/site/markdown/tutorial_messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ under the License.

In this chapter, we\'ll learn about messaging, a convenient feature in Helix for sending messages between nodes of a cluster. This is an interesting feature that is quite useful in practice. It is common that nodes in a distributed system require a mechanism to interact with each other.

### Performance Considerations

**IMPORTANT:** When using the messaging API with `Criteria`, be aware of the following performance characteristics:

- **ExternalView Scanning:** By default, the messaging service uses `DataSource.EXTERNALVIEW` to resolve criteria. This can scan **all** ExternalView znodes in the cluster, even when targeting specific instances. At high resource cardinality, this can cause severe performance degradation.

**Recommended Patterns:**

- **Use `DataSource.LIVEINSTANCES`** when you only need to target live instances and do not require resource/partition-level filtering. This is much faster and more efficient.
- **Specify exact resource names** instead of wildcards if you must use ExternalView scanning.

Example of efficient messaging:
```java
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("instance123");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setDataSource(DataSource.LIVEINSTANCES); // Efficient: avoids EV scan
recipientCriteria.setSessionSpecific(true);
```

### Example: Bootstrapping a Replica

Consider a search system where the index replica starts up and it does not have an index. A typical solution is to get the index from a common location, or to copy the index from another replica.
Expand Down
Loading