Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public HAGroupStoreRecord.HAGroupState getDefaultHAGroupState() {
case OFFLINE:
return HAGroupStoreRecord.HAGroupState.OFFLINE;
case ACTIVE_TO_STANDBY:
return HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY;
return HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY;
case STANDBY_TO_ACTIVE:
return HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE;
case UNKNOWN:
Expand Down Expand Up @@ -352,6 +352,4 @@ public String toPrettyString() {
return toString();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,22 @@ public interface HAGroupStateListener {
* consider delegating to a separate thread.</p>
*
* @param haGroupName the name of the HA group that transitioned
* @param fromState the previous state before the transition
* can be null for initial state.
* Also, can be inaccurate in case there is
* connection loss to ZK and multiple state changes happen in between.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Are not we using persistent watchers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in case of connection loss there can be events which are missed. Although this doesn't affect failover IMO, clients need to be aware of this limitation.

* @param toState the new state after the transition
* @param modifiedTime the time the state transition occurred
* @param clusterType whether this transition occurred on the local or peer cluster
* @param lastSyncStateTimeInMs the time we were in sync state, can be null.
*
* @throws Exception implementations may throw exceptions, but they will be
* logged and will not prevent other listeners from being notified
*/
void onStateChange(String haGroupName,
HAGroupState fromState,
HAGroupState toState,
long modifiedTime,
ClusterType clusterType);
ClusterType clusterType,
Long lastSyncStateTimeInMs);
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public enum HAGroupState {
ACTIVE_IN_SYNC_TO_STANDBY,
ACTIVE_WITH_OFFLINE_PEER,
DEGRADED_STANDBY,
DEGRADED_STANDBY_FOR_READER,
DEGRADED_STANDBY_FOR_WRITER,
OFFLINE,
STANDBY,
STANDBY_TO_ACTIVE,
Expand All @@ -87,8 +85,6 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
return ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY;
case ABORT_TO_STANDBY:
case DEGRADED_STANDBY:
case DEGRADED_STANDBY_FOR_READER:
case DEGRADED_STANDBY_FOR_WRITER:
case STANDBY:
return ClusterRoleRecord.ClusterRole.STANDBY;
case STANDBY_TO_ACTIVE:
Expand All @@ -113,7 +109,7 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
);

STANDBY.allowedTransitions = ImmutableSet.of(STANDBY_TO_ACTIVE,
DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER);
DEGRADED_STANDBY);
// This needs to be manually recovered by operator
OFFLINE.allowedTransitions = ImmutableSet.of();
// This needs to be manually recovered by operator
Expand All @@ -126,11 +122,7 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
STANDBY_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ABORT_TO_STANDBY,
ACTIVE_IN_SYNC);
DEGRADED_STANDBY.allowedTransitions
= ImmutableSet.of(DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER);
DEGRADED_STANDBY_FOR_WRITER.allowedTransitions = ImmutableSet.of(STANDBY,
DEGRADED_STANDBY);
DEGRADED_STANDBY_FOR_READER.allowedTransitions = ImmutableSet.of(STANDBY,
DEGRADED_STANDBY);
= ImmutableSet.of(STANDBY);
ACTIVE_WITH_OFFLINE_PEER.allowedTransitions = ImmutableSet.of(ACTIVE_NOT_IN_SYNC);
ABORT_TO_ACTIVE_IN_SYNC.allowedTransitions = ImmutableSet.of(ACTIVE_IN_SYNC);
ABORT_TO_ACTIVE_NOT_IN_SYNC.allowedTransitions = ImmutableSet.of(ACTIVE_NOT_IN_SYNC);
Expand Down Expand Up @@ -164,27 +156,35 @@ public static HAGroupState from(byte[] bytes) {
private final String haGroupName;
private final HAGroupState haGroupState;
private final Long lastSyncStateTimeInMs;
private final String policy;
private final String peerZKUrl;
private final String clusterUrl;
private final String peerClusterUrl;
private final long adminCRRVersion;

@JsonCreator
public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersion,
@JsonProperty("haGroupName") String haGroupName,
@JsonProperty("haGroupState") HAGroupState haGroupState,
@JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs) {
@JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs,
@JsonProperty("policy") String policy,
@JsonProperty("peerZKUrl") String peerZKUrl,
@JsonProperty("clusterUrl") String clusterUrl,
@JsonProperty("peerClusterUrl") String peerClusterUrl,
@JsonProperty("adminCRRVersion")
long adminCRRVersion) {
Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!");
Preconditions.checkNotNull(haGroupState, "HA group state cannot be null!");

this.protocolVersion = Objects.toString(protocolVersion, DEFAULT_PROTOCOL_VERSION);
this.haGroupName = haGroupName;
this.haGroupState = haGroupState;
this.policy = policy;
this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
}

/**
* Convenience constructor for backward compatibility without lastSyncStateTimeInMs.
*/
public HAGroupStoreRecord(String protocolVersion,
String haGroupName, HAGroupState haGroupState) {
this(protocolVersion, haGroupName, haGroupState, null);
this.peerZKUrl = peerZKUrl;
this.clusterUrl = clusterUrl;
this.peerClusterUrl = peerClusterUrl;
this.adminCRRVersion = adminCRRVersion;
}

public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
Expand All @@ -209,7 +209,12 @@ public boolean hasSameInfo(HAGroupStoreRecord other) {
return haGroupName.equals(other.haGroupName)
&& haGroupState.equals(other.haGroupState)
&& protocolVersion.equals(other.protocolVersion)
&& Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs);
&& Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs)
&& Objects.equals(policy, other.policy)
&& Objects.equals(peerZKUrl, other.peerZKUrl)
&& Objects.equals(clusterUrl, other.clusterUrl)
&& Objects.equals(peerClusterUrl, other.peerClusterUrl)
&& adminCRRVersion == other.adminCRRVersion;
}

public String getProtocolVersion() {
Expand All @@ -229,6 +234,26 @@ public Long getLastSyncStateTimeInMs() {
return lastSyncStateTimeInMs;
}

public String getPeerZKUrl() {
return peerZKUrl;
}

public String getPolicy() {
return policy;
}

public String getClusterUrl() {
return clusterUrl;
}

public String getPeerClusterUrl() {
return peerClusterUrl;
}

public long getAdminCRRVersion() {
return adminCRRVersion;
}

@JsonIgnore
public ClusterRoleRecord.ClusterRole getClusterRole() {
return haGroupState.getClusterRole();
Expand All @@ -241,6 +266,11 @@ public int hashCode() {
.append(haGroupName)
.append(haGroupState)
.append(lastSyncStateTimeInMs)
.append(policy)
.append(peerZKUrl)
.append(clusterUrl)
.append(peerClusterUrl)
.append(adminCRRVersion)
.hashCode();
}

Expand All @@ -259,6 +289,11 @@ public boolean equals(Object other) {
.append(haGroupName, otherRecord.haGroupName)
.append(haGroupState, otherRecord.haGroupState)
.append(lastSyncStateTimeInMs, otherRecord.lastSyncStateTimeInMs)
.append(policy, otherRecord.policy)
.append(peerZKUrl, otherRecord.peerZKUrl)
.append(clusterUrl, otherRecord.clusterUrl)
.append(peerClusterUrl, otherRecord.peerClusterUrl)
.append(adminCRRVersion, otherRecord.adminCRRVersion)
.isEquals();
}
}
Expand All @@ -270,6 +305,11 @@ public String toString() {
+ ", haGroupName='" + haGroupName + '\''
+ ", haGroupState=" + haGroupState
+ ", lastSyncStateTimeInMs=" + lastSyncStateTimeInMs
+ ", policy='" + policy + '\''
+ ", peerZKUrl='" + peerZKUrl + '\''
+ ", clusterUrl='" + clusterUrl + '\''
+ ", peerClusterUrl='" + peerClusterUrl + '\''
+ ", adminCRRVersion=" + adminCRRVersion
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public interface QueryServices extends SQLCloseable {
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
public static final String MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCachePersistenceTimeToLiveMs";

@Deprecated // Use FORCE_ROW_KEY_ORDER instead.
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";

public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes";
@Deprecated // use the IMMUTABLE keyword while creating the table
public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows";
Expand Down Expand Up @@ -161,7 +161,7 @@ public interface QueryServices extends SQLCloseable {
"phoenix.index.failure.handling.rebuild.interval";
public static final String INDEX_REBUILD_TASK_INITIAL_DELAY = "phoenix.index.rebuild.task.initial.delay";
public static final String START_TRUNCATE_TASK_DELAY = "phoenix.start.truncate.task.delay";

public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable";
// If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it
public static final String INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = "phoenix.index.rebuild.disabletimestamp.threshold";
Expand Down Expand Up @@ -215,7 +215,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";

public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async";
public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size";
public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
Expand Down Expand Up @@ -244,7 +244,7 @@ public interface QueryServices extends SQLCloseable {

// Tag Name to determine the Phoenix Client Type
public static final String CLIENT_METRICS_TAG = "phoenix.client.metrics.tag";

// Transaction related configs
public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
// Controls whether or not uncommitted data is automatically sent to HBase
Expand All @@ -263,7 +263,7 @@ public interface QueryServices extends SQLCloseable {
public static final String ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = "phoenix.view.allowNewColumnFamily";
public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";

public static final String MAX_VERSIONS_TRANSACTIONAL_ATTRIB = "phoenix.transactions.maxVersions";

// metadata configs
Expand All @@ -281,7 +281,7 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";
public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold";

//currently BASE64 and ASCII is supported
public static final String UPLOAD_BINARY_DATA_TYPE_ENCODING = "phoenix.upload.binaryDataType.encoding";
// Toggle for server-written updates to SYSTEM.CATALOG
Expand Down Expand Up @@ -404,7 +404,7 @@ public interface QueryServices extends SQLCloseable {
// Also, before 4.15 when we added a column to a base table we would have to propagate the
// column metadata to all its child views. After PHOENIX-3534 we no longer propagate metadata
// changes from a parent to its children (we just resolve its ancestors and include their columns)
//
//
// The following config is used to continue writing the parent table column metadata while
// creating a view and also prevent metadata changes to a parent table/view that needs to be
// propagated to its children. This is done to allow rollback of the splittable SYSTEM.CATALOG
Expand Down Expand Up @@ -558,6 +558,8 @@ public interface QueryServices extends SQLCloseable {

public static final String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled";

// HA Group Store sync job interval in seconds
String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds";

/**
* Get executor service used for parallel scans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static org.apache.phoenix.query.QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED;
import static org.apache.phoenix.query.QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE;
import static org.apache.phoenix.query.QueryServices.WAL_EDIT_CODEC_ATTRIB;
import static org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;

import java.util.Map.Entry;

Expand Down Expand Up @@ -225,7 +226,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES = 1000;
public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = true;
public static final boolean DEFAULT_INDEX_FAILURE_THROW_EXCEPTION = true;
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 60000; // 60 secs
Expand Down Expand Up @@ -375,7 +376,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS = 15;
public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;

//Security defaults
public static final boolean DEFAULT_PHOENIX_ACLS_ENABLED = false;

Expand Down Expand Up @@ -472,6 +473,9 @@ public class QueryServicesOptions {

public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false;

// Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds)
public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900;

private final Configuration config;

private QueryServicesOptions(Configuration config) {
Expand Down Expand Up @@ -586,7 +590,9 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
.setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
.setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
.setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);

// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ message InvalidateServerMetadataCacheRequest {

message InvalidateHAGroupStoreClientRequest {
required bytes haGroupName = 1;
required bool broadcastUpdate = 2;
}

message InvalidateHAGroupStoreClientResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public void invalidateHAGroupStoreClient(RpcController controller,
= HAGroupStoreManager.getInstance(conf);
if (haGroupStoreManager != null) {
haGroupStoreManager
.invalidateHAGroupStoreClient(request.getHaGroupName().toStringUtf8(),
request.getBroadcastUpdate());
.invalidateHAGroupStoreClient(request.getHaGroupName().toStringUtf8());
} else {
throw new IOException("HAGroupStoreManager is null for "
+ "current cluster, check configuration");
Expand Down
Loading