diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java index f31f0e397a..09c82b2fa2 100644 --- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java @@ -37,6 +37,13 @@ public interface ClusterMessagingService { /** * Send message matching the specifications mentioned in recipientCriteria. + * + *

PERFORMANCE WARNING: When recipientCriteria uses {@link DataSource#EXTERNALVIEW} + * with wildcard or unspecified resource names, this scans ALL ExternalView znodes in the cluster, + * regardless of other criteria like instanceName. At scale, this causes + * severe performance degradation. Use {@link DataSource#LIVEINSTANCES} when you don't need + * resource/partition filtering, or specify exact resource names when using EXTERNALVIEW. + * * @param recipientCriteria criteria to be met, defined as {@link Criteria} * @See Criteria * @param message @@ -54,6 +61,7 @@ public interface ClusterMessagingService { * This method will return after sending the messages.
* This is useful when message need to be sent and current thread need not * wait for response since processing will be done in another thread. + * * @see #send(Criteria, Message) * @param recipientCriteria * @param message @@ -85,7 +93,8 @@ int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnRe * for response.
* The current thread can use callbackOnReply instance to store application * specific data. - * @see #send(Criteria, Message, AsyncCallback, int) + * + * @see #send(Criteria, Message) * @param recipientCriteria * @param message * @param callbackOnReply @@ -96,7 +105,7 @@ int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback callb int timeOut); /** - * @see #send(Criteria, Message, AsyncCallback, int, int) + * @see #send(Criteria, Message) * @param receipientCriteria * @param message * @param callbackOnReply @@ -143,6 +152,8 @@ int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback call /** * This will generate all messages to be sent given the recipientCriteria and MessageTemplate, * the messages are not sent. + * + * @see #send(Criteria, Message) * @param recipientCriteria criteria to be met, defined as {@link Criteria} * @param messageTemplate the Message on which to base the messages to send * @return messages to be sent, grouped by the type of instance to send the message to diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java b/helix-core/src/main/java/org/apache/helix/Criteria.java index d01228db83..1ddb429e1b 100644 --- a/helix-core/src/main/java/org/apache/helix/Criteria.java +++ b/helix-core/src/main/java/org/apache/helix/Criteria.java @@ -20,9 +20,43 @@ */ /** - * Describes various properties that operations involving {@link Message} delivery will follow. + * Specifies recipient criteria for message delivery in a Helix cluster. + * + *

PERFORMANCE WARNING: Using {@link DataSource#EXTERNALVIEW} with wildcard or unspecified + * resource names causes Helix to scan ALL ExternalView znodes in the cluster, regardless of other + * criteria fields. At scale, this causes severe performance degradation. + * + *

Example - Efficient Pattern: + *

+ * // GOOD: Target specific live instance
+ * Criteria criteria = new Criteria();
+ * criteria.setInstanceName("host_1234");
+ * criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ * criteria.setDataSource(DataSource.LIVEINSTANCES);  // Fast
+ * criteria.setSessionSpecific(true);
+ * 
+ * // BAD: Wildcard resource with ExternalView
+ * Criteria criteria = new Criteria();
+ * criteria.setInstanceName("host_1234");
+ * criteria.setDataSource(DataSource.EXTERNALVIEW);
+ * criteria.setResource("%");  // Scans ALL ExternalViews!
+ * 
+ * + *

DataSource Selection: + *

+ * + * @see ClusterMessagingService#send(Criteria, org.apache.helix.model.Message) */ public class Criteria { + /** + * Source of cluster state data for resolving message recipients. + */ public enum DataSource { IDEALSTATES, EXTERNALVIEW, @@ -80,8 +114,12 @@ public DataSource getDataSource() { } /** - * Set the current source of truth - * @param source ideal state or external view + * Set the current source of truth for resolving message recipients. + * + *

Prefer {@link DataSource#LIVEINSTANCES} when not filtering by resource/partition. + * If using {@link DataSource#EXTERNALVIEW}, specify exact resource names to avoid full scans. + * + * @param source ideal state, external view, live instances, or instances */ public void setDataSource(DataSource source) { _dataSource = source; @@ -161,8 +199,12 @@ public String getResource() { } /** - * Set the destination resource name - * @param resourceName the resource name or % for all resources + * Set the destination resource name. + * + *

Only meaningful for {@link DataSource#EXTERNALVIEW} or {@link DataSource#IDEALSTATES}. + * Using wildcard "%" with EXTERNALVIEW reads ALL ExternalView znodes - use exact names instead. + * + * @param resourceName the exact resource name, or "%" for all resources */ public void setResource(String resourceName) { this.resourceName = resourceName; diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index c1d2ad18c5..cf3378314c 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -409,6 +409,8 @@ void addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l /** * Messaging service which can be used to send cluster wide messages. + * See {@link ClusterMessagingService#send(Criteria, org.apache.helix.model.Message)} for usage. + * * @return messaging service */ ClusterMessagingService getMessagingService(); diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java index f0e9ef58ff..5ef64f4c8f 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java @@ -39,12 +39,51 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Evaluates {@link Criteria} against persisted Helix data to determine message recipients. + * + *

PERFORMANCE WARNING: When using {@link DataSource#EXTERNALVIEW}, this evaluator + * will scan all ExternalView znodes in the cluster if the resource name is unspecified or uses wildcards + * (e.g., "%" or "*"). This scanning happens even when targeting specific instances, and is + * NOT automatically optimized based on other criteria fields (like instanceName). + * + *

At high ExternalView cardinality, this can cause severe performance degradation. + * + *

Safer Patterns: + *

+ * + *

Example - Targeting a specific instance: + *

+ * // BAD: Scans all ExternalViews even though instance is specified
+ * Criteria criteria = new Criteria();
+ * criteria.setInstanceName("instance123");
+ * criteria.setDataSource(DataSource.EXTERNALVIEW);
+ * criteria.setResource("%"); // wildcard triggers full scan
+ * 
+ * // GOOD: Uses LIVEINSTANCES, avoids ExternalView scan
+ * Criteria criteria = new Criteria();
+ * criteria.setInstanceName("instance123");
+ * criteria.setDataSource(DataSource.LIVEINSTANCES);
+ * 
+ */ public class CriteriaEvaluator { private static Logger logger = LoggerFactory.getLogger(CriteriaEvaluator.class); public static final String MATCH_ALL_SYM = "%"; /** * Examine persisted data to match wildcards in {@link Criteria} + * + *

PERFORMANCE WARNING: Using {@link DataSource#EXTERNALVIEW} with wildcard resource + * names (or unspecified resource) will scan ALL ExternalView znodes, even when targeting specific + * instances. At high cardinality, this can cause severe performance degradation. Prefer + * {@link DataSource#LIVEINSTANCES} when resource/partition filtering is not needed. + * * @param recipientCriteria Criteria specifying the message destinations * @param manager connection to the persisted data * @return map of evaluated criteria @@ -56,6 +95,12 @@ public List> evaluateCriteria(Criteria recipientCriteria, /** * Examine persisted data to match wildcards in {@link Criteria} + * + *

PERFORMANCE WARNING: Using {@link DataSource#EXTERNALVIEW} with wildcard resource + * names (or unspecified resource) will scan ALL ExternalView znodes, even when targeting specific + * instances. At high cardinality, this can cause severe performance degradation. Prefer + * {@link DataSource#LIVEINSTANCES} when resource/partition filtering is not needed. + * * @param recipientCriteria Criteria specifying the message destinations * @param accessor connection to the persisted data * @return map of evaluated criteria diff --git a/helix-core/src/main/java/org/apache/helix/messaging/package-info.java b/helix-core/src/main/java/org/apache/helix/messaging/package-info.java index dcd44b350b..6de13e88f0 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/package-info.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/package-info.java @@ -18,7 +18,9 @@ */ /** - * Helix message handling classes - * + * Helix message handling classes. + * + *

When using the messaging API, configure {@link org.apache.helix.Criteria} carefully + * to avoid performance issues. See {@link org.apache.helix.ClusterMessagingService}. */ package org.apache.helix.messaging; \ No newline at end of file diff --git a/website/0.9.9/src/site/markdown/Features.md b/website/0.9.9/src/site/markdown/Features.md index 476956a637..4cba119539 100644 --- a/website/0.9.9/src/site/markdown/Features.md +++ b/website/0.9.9/src/site/markdown/Features.md @@ -220,32 +220,134 @@ Since Helix is aware of the global state of the system, it can send the message This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc. System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes. +#### Understanding Criteria and DataSource + +The `Criteria` object allows you to specify message recipients using various attributes. A critical configuration is the `DataSource`, which determines where Helix looks up the cluster state to resolve your criteria. + +**Available DataSource Options:** + +Helix provides four DataSource types, each reading from different znodes in ZooKeeper: + +| DataSource | Description | When to Use | +|------------|-------------|-------------| +| **LIVEINSTANCES** | Reads from `/LIVEINSTANCES` znodes | Targeting live instances without needing resource/partition/state filtering | +| **INSTANCES** | Reads from `/INSTANCES/[instance]` znodes | Targeting specific configured instances (live or not) based on instance configuration | +| **EXTERNALVIEW** | Reads from `/EXTERNALVIEWS/[resource]` znodes | Targeting based on actual replica placement, partition ownership, or replica state (MASTER/SLAVE) | +| **IDEALSTATES** | Reads from `/IDEALSTATES/[resource]` znodes | Targeting based on ideal state configuration (intended placement) | + +**Key Differences:** + +- **LIVEINSTANCES**: Contains only instance names of currently connected participants. No resource/partition information. Smallest dataset. +- **INSTANCES**: Contains instance configuration (host, port, enabled/disabled status). No resource/partition information. +- **EXTERNALVIEW**: Contains actual current state - which instances own which partitions and their states (MASTER/SLAVE/OFFLINE). Large dataset at scale. +- **IDEALSTATES**: Contains desired state - which instances should own which partitions. Similar size to ExternalView. + +**Choosing the Right DataSource:** + +| Your Goal | Correct DataSource | Example Use Case | +|-----------|-------------------|------------------| +| Send to specific live instance(s) | `LIVEINSTANCES` | Health check, admin command to specific node | +| Send to all live instances | `LIVEINSTANCES` | Broadcast announcement, cluster-wide operation | +| Send to replicas of a specific partition | `EXTERNALVIEW` (with exact resource name) | Bootstrap replica from peers | +| Send to all MASTER replicas of a resource | `EXTERNALVIEW` (with exact resource name) | Trigger operation on masters only | +| Send based on partition state | `EXTERNALVIEW` (with exact resource name) | Target only ONLINE/MASTER/SLAVE replicas | + +#### CRITICAL: Performance Considerations + +**⚠️ WARNING:** Using `EXTERNALVIEW` as the DataSource can cause severe performance issues at scale. + +**The Problem:** +When using `DataSource.EXTERNALVIEW`, Helix will scan **ALL** ExternalView znodes in the cluster if: +- You use wildcards (`%` or `*`) in the resource name, OR +- You leave the resource name unspecified + +**This happens even when targeting specific instances!** The scan is NOT automatically optimized based on other criteria fields like `instanceName`. + +At high ExternalView cardinality, this can cause severe performance degradation. + +#### How to Set Criteria Correctly + +**Pattern 1: Targeting Specific Instances (Most Common)** + +When you only need to send messages to specific instances and don't need resource/partition-level filtering: + +```java +// GOOD: Efficient - Uses LIVEINSTANCES, avoids ExternalView scan +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); // or "%" for all live instances +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Key: Use LIVEINSTANCES +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Scans ALL ExternalViews even though targeting specific instance +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); // Will scan ALL resources! +criteria.setResource("%"); // Wildcard triggers full scan +``` + +**Pattern 2: Targeting Specific Resource and Partition** + +When you need to send messages based on resource ownership (e.g., all replicas of a partition): + +```java +// GOOD: Efficient - Specifies exact resource name, scans only 1 ExternalView +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("MyDB"); // Exact resource name - scans only this EV +criteria.setPartition("MyDB_0"); // Specific partition +criteria.setPartitionState("MASTER"); // Only send to masters +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Wildcard resource scans ALL ExternalViews +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("%"); // Wildcard scans ALL ExternalViews in cluster! +criteria.setPartition("MyDB_0"); +criteria.setSessionSpecific(true); ``` - ClusterMessagingService messagingService = manager.getMessagingService(); - //CONSTRUCT THE MESSAGE - Message requestBackupUriRequest = new Message( - MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); - requestBackupUriRequest - .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL); - requestBackupUriRequest.setMsgState(MessageState.NEW); - //SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource("MyDB"); - recipientCriteria.setPartition(""); - //Should be processed only the process that is active at the time of sending the message. - //This means if the recipient is restarted after message is sent, it will not be processed. - recipientCriteria.setSessionSpecific(true); - // wait for 30 seconds - int timeout = 30000; - //The handler that will be invoked when any recipient responds to the message. - BootstrapReplyHandler responseHandler = new BootstrapReplyHandler(); - //This will return only after all recipients respond or after timeout. - int sentMessageCount = messagingService.sendAndWait(recipientCriteria, - requestBackupUriRequest, responseHandler, timeout); + +**Pattern 3: Broadcasting to All Live Instances** + +```java +// GOOD: Efficient broadcast to all live participants +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Fast broadcast +criteria.setSessionSpecific(true); ``` +#### Criteria Configuration Reference + +The `Criteria` class provides the following configuration methods: + +| Method | Parameter | Description | Wildcard Support | Example | +|--------|-----------|-------------|------------------|---------| +| `setDataSource(DataSource)` | LIVEINSTANCES, INSTANCES, EXTERNALVIEW, IDEALSTATES | **MOST IMPORTANT:** Determines which znodes to read | N/A | `DataSource.LIVEINSTANCES` | +| `setInstanceName(String)` | Instance name | Target specific instance(s) by name | Yes (`%` = all) | `"localhost_12918"` or `"%"` | +| `setResource(String)` | Resource name | Filter by resource name (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase"` or `"%"` | +| `setPartition(String)` | Partition name | Filter by specific partition (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase_0"` or `"%"` | +| `setPartitionState(String)` | State name | Filter by replica state like MASTER, SLAVE, ONLINE, OFFLINE (only for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MASTER"` or `"%"` | +| `setRecipientInstanceType(InstanceType)` | PARTICIPANT, CONTROLLER, SPECTATOR | Type of Helix process to target | No | `InstanceType.PARTICIPANT` | +| `setSessionSpecific(boolean)` | true/false | If true, message is only delivered to currently active sessions (not redelivered after restart) | No | `true` (recommended) | + +**Important Notes:** + +- **Wildcards:** Use `%` (SQL-style) or `*` to match all. Single underscore `_` matches any single character. +- **DataSource Compatibility:** Setting `resource`, `partition`, or `partitionState` only makes sense with `EXTERNALVIEW` or `IDEALSTATES` DataSource. They are ignored for `LIVEINSTANCES` and `INSTANCES`. +- **Session-Specific:** Set to `true` for most use cases to avoid redelivering messages after a participant restarts. +- **Empty vs Wildcard:** Empty string `""` and wildcard `"%"` are treated the same - both match all. + See HelixManager.getMessagingService for more info. diff --git a/website/0.9.9/src/site/markdown/tutorial_messaging.md b/website/0.9.9/src/site/markdown/tutorial_messaging.md index 0b32bdac17..47cf0f02ef 100644 --- a/website/0.9.9/src/site/markdown/tutorial_messaging.md +++ b/website/0.9.9/src/site/markdown/tutorial_messaging.md @@ -25,6 +25,26 @@ under the License. In this chapter, we\'ll learn about messaging, a convenient feature in Helix for sending messages between nodes of a cluster. This is an interesting feature that is quite useful in practice. It is common that nodes in a distributed system require a mechanism to interact with each other. +### Performance Considerations + +**IMPORTANT:** When using the messaging API with `Criteria`, be aware of the following performance characteristics: + +- **ExternalView Scanning:** By default, the messaging service uses `DataSource.EXTERNALVIEW` to resolve criteria. This can scan **all** ExternalView znodes in the cluster, even when targeting specific instances. At high resource cardinality, this can cause severe performance degradation. + +**Recommended Patterns:** + +- **Use `DataSource.LIVEINSTANCES`** when you only need to target live instances and do not require resource/partition-level filtering. This is much faster and more efficient. +- **Specify exact resource names** instead of wildcards if you must use ExternalView scanning. + +Example of efficient messaging: +```java +Criteria recipientCriteria = new Criteria(); +recipientCriteria.setInstanceName("instance123"); +recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +recipientCriteria.setDataSource(DataSource.LIVEINSTANCES); // Efficient: avoids EV scan +recipientCriteria.setSessionSpecific(true); +``` + ### Example: Bootstrapping a Replica Consider a search system where the index replica starts up and it does not have an index. A typical solution is to get the index from a common location, or to copy the index from another replica. diff --git a/website/1.3.2/src/site/markdown/Features.md b/website/1.3.2/src/site/markdown/Features.md index 476956a637..4cba119539 100644 --- a/website/1.3.2/src/site/markdown/Features.md +++ b/website/1.3.2/src/site/markdown/Features.md @@ -220,32 +220,134 @@ Since Helix is aware of the global state of the system, it can send the message This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc. System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes. +#### Understanding Criteria and DataSource + +The `Criteria` object allows you to specify message recipients using various attributes. A critical configuration is the `DataSource`, which determines where Helix looks up the cluster state to resolve your criteria. + +**Available DataSource Options:** + +Helix provides four DataSource types, each reading from different znodes in ZooKeeper: + +| DataSource | Description | When to Use | +|------------|-------------|-------------| +| **LIVEINSTANCES** | Reads from `/LIVEINSTANCES` znodes | Targeting live instances without needing resource/partition/state filtering | +| **INSTANCES** | Reads from `/INSTANCES/[instance]` znodes | Targeting specific configured instances (live or not) based on instance configuration | +| **EXTERNALVIEW** | Reads from `/EXTERNALVIEWS/[resource]` znodes | Targeting based on actual replica placement, partition ownership, or replica state (MASTER/SLAVE) | +| **IDEALSTATES** | Reads from `/IDEALSTATES/[resource]` znodes | Targeting based on ideal state configuration (intended placement) | + +**Key Differences:** + +- **LIVEINSTANCES**: Contains only instance names of currently connected participants. No resource/partition information. Smallest dataset. +- **INSTANCES**: Contains instance configuration (host, port, enabled/disabled status). No resource/partition information. +- **EXTERNALVIEW**: Contains actual current state - which instances own which partitions and their states (MASTER/SLAVE/OFFLINE). Large dataset at scale. +- **IDEALSTATES**: Contains desired state - which instances should own which partitions. Similar size to ExternalView. + +**Choosing the Right DataSource:** + +| Your Goal | Correct DataSource | Example Use Case | +|-----------|-------------------|------------------| +| Send to specific live instance(s) | `LIVEINSTANCES` | Health check, admin command to specific node | +| Send to all live instances | `LIVEINSTANCES` | Broadcast announcement, cluster-wide operation | +| Send to replicas of a specific partition | `EXTERNALVIEW` (with exact resource name) | Bootstrap replica from peers | +| Send to all MASTER replicas of a resource | `EXTERNALVIEW` (with exact resource name) | Trigger operation on masters only | +| Send based on partition state | `EXTERNALVIEW` (with exact resource name) | Target only ONLINE/MASTER/SLAVE replicas | + +#### CRITICAL: Performance Considerations + +**⚠️ WARNING:** Using `EXTERNALVIEW` as the DataSource can cause severe performance issues at scale. + +**The Problem:** +When using `DataSource.EXTERNALVIEW`, Helix will scan **ALL** ExternalView znodes in the cluster if: +- You use wildcards (`%` or `*`) in the resource name, OR +- You leave the resource name unspecified + +**This happens even when targeting specific instances!** The scan is NOT automatically optimized based on other criteria fields like `instanceName`. + +At high ExternalView cardinality, this can cause severe performance degradation. + +#### How to Set Criteria Correctly + +**Pattern 1: Targeting Specific Instances (Most Common)** + +When you only need to send messages to specific instances and don't need resource/partition-level filtering: + +```java +// GOOD: Efficient - Uses LIVEINSTANCES, avoids ExternalView scan +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); // or "%" for all live instances +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Key: Use LIVEINSTANCES +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Scans ALL ExternalViews even though targeting specific instance +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); // Will scan ALL resources! +criteria.setResource("%"); // Wildcard triggers full scan +``` + +**Pattern 2: Targeting Specific Resource and Partition** + +When you need to send messages based on resource ownership (e.g., all replicas of a partition): + +```java +// GOOD: Efficient - Specifies exact resource name, scans only 1 ExternalView +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("MyDB"); // Exact resource name - scans only this EV +criteria.setPartition("MyDB_0"); // Specific partition +criteria.setPartitionState("MASTER"); // Only send to masters +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Wildcard resource scans ALL ExternalViews +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("%"); // Wildcard scans ALL ExternalViews in cluster! +criteria.setPartition("MyDB_0"); +criteria.setSessionSpecific(true); ``` - ClusterMessagingService messagingService = manager.getMessagingService(); - //CONSTRUCT THE MESSAGE - Message requestBackupUriRequest = new Message( - MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); - requestBackupUriRequest - .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL); - requestBackupUriRequest.setMsgState(MessageState.NEW); - //SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource("MyDB"); - recipientCriteria.setPartition(""); - //Should be processed only the process that is active at the time of sending the message. - //This means if the recipient is restarted after message is sent, it will not be processed. - recipientCriteria.setSessionSpecific(true); - // wait for 30 seconds - int timeout = 30000; - //The handler that will be invoked when any recipient responds to the message. - BootstrapReplyHandler responseHandler = new BootstrapReplyHandler(); - //This will return only after all recipients respond or after timeout. - int sentMessageCount = messagingService.sendAndWait(recipientCriteria, - requestBackupUriRequest, responseHandler, timeout); + +**Pattern 3: Broadcasting to All Live Instances** + +```java +// GOOD: Efficient broadcast to all live participants +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Fast broadcast +criteria.setSessionSpecific(true); ``` +#### Criteria Configuration Reference + +The `Criteria` class provides the following configuration methods: + +| Method | Parameter | Description | Wildcard Support | Example | +|--------|-----------|-------------|------------------|---------| +| `setDataSource(DataSource)` | LIVEINSTANCES, INSTANCES, EXTERNALVIEW, IDEALSTATES | **MOST IMPORTANT:** Determines which znodes to read | N/A | `DataSource.LIVEINSTANCES` | +| `setInstanceName(String)` | Instance name | Target specific instance(s) by name | Yes (`%` = all) | `"localhost_12918"` or `"%"` | +| `setResource(String)` | Resource name | Filter by resource name (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase"` or `"%"` | +| `setPartition(String)` | Partition name | Filter by specific partition (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase_0"` or `"%"` | +| `setPartitionState(String)` | State name | Filter by replica state like MASTER, SLAVE, ONLINE, OFFLINE (only for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MASTER"` or `"%"` | +| `setRecipientInstanceType(InstanceType)` | PARTICIPANT, CONTROLLER, SPECTATOR | Type of Helix process to target | No | `InstanceType.PARTICIPANT` | +| `setSessionSpecific(boolean)` | true/false | If true, message is only delivered to currently active sessions (not redelivered after restart) | No | `true` (recommended) | + +**Important Notes:** + +- **Wildcards:** Use `%` (SQL-style) or `*` to match all. Single underscore `_` matches any single character. +- **DataSource Compatibility:** Setting `resource`, `partition`, or `partitionState` only makes sense with `EXTERNALVIEW` or `IDEALSTATES` DataSource. They are ignored for `LIVEINSTANCES` and `INSTANCES`. +- **Session-Specific:** Set to `true` for most use cases to avoid redelivering messages after a participant restarts. +- **Empty vs Wildcard:** Empty string `""` and wildcard `"%"` are treated the same - both match all. + See HelixManager.getMessagingService for more info. diff --git a/website/1.3.2/src/site/markdown/tutorial_messaging.md b/website/1.3.2/src/site/markdown/tutorial_messaging.md index bdbd936730..aeaf1cbf22 100644 --- a/website/1.3.2/src/site/markdown/tutorial_messaging.md +++ b/website/1.3.2/src/site/markdown/tutorial_messaging.md @@ -25,6 +25,26 @@ under the License. In this chapter, we\'ll learn about messaging, a convenient feature in Helix for sending messages between nodes of a cluster. This is an interesting feature that is quite useful in practice. It is common that nodes in a distributed system require a mechanism to interact with each other. +### Performance Considerations + +**IMPORTANT:** When using the messaging API with `Criteria`, be aware of the following performance characteristics: + +- **ExternalView Scanning:** By default, the messaging service uses `DataSource.EXTERNALVIEW` to resolve criteria. This can scan **all** ExternalView znodes in the cluster, even when targeting specific instances. At high resource cardinality, this can cause severe performance degradation. + +**Recommended Patterns:** + +- **Use `DataSource.LIVEINSTANCES`** when you only need to target live instances and do not require resource/partition-level filtering. This is much faster and more efficient. +- **Specify exact resource names** instead of wildcards if you must use ExternalView scanning. + +Example of efficient messaging: +```java +Criteria recipientCriteria = new Criteria(); +recipientCriteria.setInstanceName("instance123"); +recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +recipientCriteria.setDataSource(DataSource.LIVEINSTANCES); // Efficient: avoids EV scan +recipientCriteria.setSessionSpecific(true); +``` + ### Example: Bootstrapping a Replica Consider a search system where the index replica starts up and it does not have an index. A typical solution is to get the index from a common location, or to copy the index from another replica. diff --git a/website/1.4.3/src/site/markdown/Features.md b/website/1.4.3/src/site/markdown/Features.md index 476956a637..4cba119539 100644 --- a/website/1.4.3/src/site/markdown/Features.md +++ b/website/1.4.3/src/site/markdown/Features.md @@ -220,32 +220,134 @@ Since Helix is aware of the global state of the system, it can send the message This is a very generic api and can also be used to schedule various periodic tasks in the cluster like data backups etc. System Admins can also perform adhoc tasks like on demand backup or execute a system command(like rm -rf ;-)) across all nodes. +#### Understanding Criteria and DataSource + +The `Criteria` object allows you to specify message recipients using various attributes. A critical configuration is the `DataSource`, which determines where Helix looks up the cluster state to resolve your criteria. + +**Available DataSource Options:** + +Helix provides four DataSource types, each reading from different znodes in ZooKeeper: + +| DataSource | Description | When to Use | +|------------|-------------|-------------| +| **LIVEINSTANCES** | Reads from `/LIVEINSTANCES` znodes | Targeting live instances without needing resource/partition/state filtering | +| **INSTANCES** | Reads from `/INSTANCES/[instance]` znodes | Targeting specific configured instances (live or not) based on instance configuration | +| **EXTERNALVIEW** | Reads from `/EXTERNALVIEWS/[resource]` znodes | Targeting based on actual replica placement, partition ownership, or replica state (MASTER/SLAVE) | +| **IDEALSTATES** | Reads from `/IDEALSTATES/[resource]` znodes | Targeting based on ideal state configuration (intended placement) | + +**Key Differences:** + +- **LIVEINSTANCES**: Contains only instance names of currently connected participants. No resource/partition information. Smallest dataset. +- **INSTANCES**: Contains instance configuration (host, port, enabled/disabled status). No resource/partition information. +- **EXTERNALVIEW**: Contains actual current state - which instances own which partitions and their states (MASTER/SLAVE/OFFLINE). Large dataset at scale. +- **IDEALSTATES**: Contains desired state - which instances should own which partitions. Similar size to ExternalView. + +**Choosing the Right DataSource:** + +| Your Goal | Correct DataSource | Example Use Case | +|-----------|-------------------|------------------| +| Send to specific live instance(s) | `LIVEINSTANCES` | Health check, admin command to specific node | +| Send to all live instances | `LIVEINSTANCES` | Broadcast announcement, cluster-wide operation | +| Send to replicas of a specific partition | `EXTERNALVIEW` (with exact resource name) | Bootstrap replica from peers | +| Send to all MASTER replicas of a resource | `EXTERNALVIEW` (with exact resource name) | Trigger operation on masters only | +| Send based on partition state | `EXTERNALVIEW` (with exact resource name) | Target only ONLINE/MASTER/SLAVE replicas | + +#### CRITICAL: Performance Considerations + +**⚠️ WARNING:** Using `EXTERNALVIEW` as the DataSource can cause severe performance issues at scale. + +**The Problem:** +When using `DataSource.EXTERNALVIEW`, Helix will scan **ALL** ExternalView znodes in the cluster if: +- You use wildcards (`%` or `*`) in the resource name, OR +- You leave the resource name unspecified + +**This happens even when targeting specific instances!** The scan is NOT automatically optimized based on other criteria fields like `instanceName`. + +At high ExternalView cardinality, this can cause severe performance degradation. + +#### How to Set Criteria Correctly + +**Pattern 1: Targeting Specific Instances (Most Common)** + +When you only need to send messages to specific instances and don't need resource/partition-level filtering: + +```java +// GOOD: Efficient - Uses LIVEINSTANCES, avoids ExternalView scan +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); // or "%" for all live instances +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Key: Use LIVEINSTANCES +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Scans ALL ExternalViews even though targeting specific instance +Criteria criteria = new Criteria(); +criteria.setInstanceName("instance123"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); // Will scan ALL resources! +criteria.setResource("%"); // Wildcard triggers full scan +``` + +**Pattern 2: Targeting Specific Resource and Partition** + +When you need to send messages based on resource ownership (e.g., all replicas of a partition): + +```java +// GOOD: Efficient - Specifies exact resource name, scans only 1 ExternalView +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("MyDB"); // Exact resource name - scans only this EV +criteria.setPartition("MyDB_0"); // Specific partition +criteria.setPartitionState("MASTER"); // Only send to masters +criteria.setSessionSpecific(true); +``` + +```java +// BAD: Inefficient - Wildcard resource scans ALL ExternalViews +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.EXTERNALVIEW); +criteria.setResource("%"); // Wildcard scans ALL ExternalViews in cluster! +criteria.setPartition("MyDB_0"); +criteria.setSessionSpecific(true); ``` - ClusterMessagingService messagingService = manager.getMessagingService(); - //CONSTRUCT THE MESSAGE - Message requestBackupUriRequest = new Message( - MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); - requestBackupUriRequest - .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL); - requestBackupUriRequest.setMsgState(MessageState.NEW); - //SET THE RECIPIENT CRITERIA, All nodes that satisfy the criteria will receive the message - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource("MyDB"); - recipientCriteria.setPartition(""); - //Should be processed only the process that is active at the time of sending the message. - //This means if the recipient is restarted after message is sent, it will not be processed. - recipientCriteria.setSessionSpecific(true); - // wait for 30 seconds - int timeout = 30000; - //The handler that will be invoked when any recipient responds to the message. - BootstrapReplyHandler responseHandler = new BootstrapReplyHandler(); - //This will return only after all recipients respond or after timeout. - int sentMessageCount = messagingService.sendAndWait(recipientCriteria, - requestBackupUriRequest, responseHandler, timeout); + +**Pattern 3: Broadcasting to All Live Instances** + +```java +// GOOD: Efficient broadcast to all live participants +Criteria criteria = new Criteria(); +criteria.setInstanceName("%"); +criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +criteria.setDataSource(DataSource.LIVEINSTANCES); // Fast broadcast +criteria.setSessionSpecific(true); ``` +#### Criteria Configuration Reference + +The `Criteria` class provides the following configuration methods: + +| Method | Parameter | Description | Wildcard Support | Example | +|--------|-----------|-------------|------------------|---------| +| `setDataSource(DataSource)` | LIVEINSTANCES, INSTANCES, EXTERNALVIEW, IDEALSTATES | **MOST IMPORTANT:** Determines which znodes to read | N/A | `DataSource.LIVEINSTANCES` | +| `setInstanceName(String)` | Instance name | Target specific instance(s) by name | Yes (`%` = all) | `"localhost_12918"` or `"%"` | +| `setResource(String)` | Resource name | Filter by resource name (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase"` or `"%"` | +| `setPartition(String)` | Partition name | Filter by specific partition (only meaningful for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MyDatabase_0"` or `"%"` | +| `setPartitionState(String)` | State name | Filter by replica state like MASTER, SLAVE, ONLINE, OFFLINE (only for EXTERNALVIEW/IDEALSTATES) | Yes (`%` = all) | `"MASTER"` or `"%"` | +| `setRecipientInstanceType(InstanceType)` | PARTICIPANT, CONTROLLER, SPECTATOR | Type of Helix process to target | No | `InstanceType.PARTICIPANT` | +| `setSessionSpecific(boolean)` | true/false | If true, message is only delivered to currently active sessions (not redelivered after restart) | No | `true` (recommended) | + +**Important Notes:** + +- **Wildcards:** Use `%` (SQL-style) or `*` to match all. Single underscore `_` matches any single character. +- **DataSource Compatibility:** Setting `resource`, `partition`, or `partitionState` only makes sense with `EXTERNALVIEW` or `IDEALSTATES` DataSource. They are ignored for `LIVEINSTANCES` and `INSTANCES`. +- **Session-Specific:** Set to `true` for most use cases to avoid redelivering messages after a participant restarts. +- **Empty vs Wildcard:** Empty string `""` and wildcard `"%"` are treated the same - both match all. + See HelixManager.getMessagingService for more info. diff --git a/website/1.4.3/src/site/markdown/tutorial_messaging.md b/website/1.4.3/src/site/markdown/tutorial_messaging.md index 68135762bc..1bc2a20a9d 100644 --- a/website/1.4.3/src/site/markdown/tutorial_messaging.md +++ b/website/1.4.3/src/site/markdown/tutorial_messaging.md @@ -25,6 +25,26 @@ under the License. In this chapter, we\'ll learn about messaging, a convenient feature in Helix for sending messages between nodes of a cluster. This is an interesting feature that is quite useful in practice. It is common that nodes in a distributed system require a mechanism to interact with each other. +### Performance Considerations + +**IMPORTANT:** When using the messaging API with `Criteria`, be aware of the following performance characteristics: + +- **ExternalView Scanning:** By default, the messaging service uses `DataSource.EXTERNALVIEW` to resolve criteria. This can scan **all** ExternalView znodes in the cluster, even when targeting specific instances. At high resource cardinality, this can cause severe performance degradation. + +**Recommended Patterns:** + +- **Use `DataSource.LIVEINSTANCES`** when you only need to target live instances and do not require resource/partition-level filtering. This is much faster and more efficient. +- **Specify exact resource names** instead of wildcards if you must use ExternalView scanning. + +Example of efficient messaging: +```java +Criteria recipientCriteria = new Criteria(); +recipientCriteria.setInstanceName("instance123"); +recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); +recipientCriteria.setDataSource(DataSource.LIVEINSTANCES); +recipientCriteria.setSessionSpecific(true); +``` + ### Example: Bootstrapping a Replica Consider a search system where the index replica starts up and it does not have an index. A typical solution is to get the index from a common location, or to copy the index from another replica.