Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public HAGroupStoreRecord.HAGroupState getDefaultHAGroupState() {
case OFFLINE:
return HAGroupStoreRecord.HAGroupState.OFFLINE;
case ACTIVE_TO_STANDBY:
return HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY;
return HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY;
case STANDBY_TO_ACTIVE:
return HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE;
case UNKNOWN:
Expand Down Expand Up @@ -337,6 +337,4 @@ public String toPrettyString() {
return toString();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,22 @@ public interface HAGroupStateListener {
* consider delegating to a separate thread.</p>
*
* @param haGroupName the name of the HA group that transitioned
* @param fromState the previous state before the transition
* can be null for initial state.
* Also, can be inaccurate in case there is
* connection loss to ZK and multiple state changes happen in between.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Are not we using persistent watchers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in case of connection loss there can be events which are missed. Although this doesn't affect failover IMO, clients need to be aware of this limitation.

* @param toState the new state after the transition
* @param modifiedTime the time the state transition occurred
* @param clusterType whether this transition occurred on the local or peer cluster
* @param lastSyncStateTimeInMs the time we were in sync state, can be null.
*
* @throws Exception implementations may throw exceptions, but they will be
* logged and will not prevent other listeners from being notified
*/
void onStateChange(String haGroupName,
HAGroupState fromState,
HAGroupState toState,
long modifiedTime,
ClusterType clusterType);
ClusterType clusterType,
Long lastSyncStateTimeInMs);
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class HAGroupStoreRecord {

private static final Logger LOG = LoggerFactory.getLogger(HAGroupStoreRecord.class);
public static final String DEFAULT_PROTOCOL_VERSION = "1.0";
public static final long DEFAULT_RECORD_VERSION = 1L;

/**
* Enum representing the HA group state with each state having a corresponding ClusterRole.
Expand All @@ -61,8 +60,6 @@ public enum HAGroupState {
ACTIVE_IN_SYNC_TO_STANDBY,
ACTIVE_WITH_OFFLINE_PEER,
DEGRADED_STANDBY,
DEGRADED_STANDBY_FOR_READER,
DEGRADED_STANDBY_FOR_WRITER,
OFFLINE,
STANDBY,
STANDBY_TO_ACTIVE,
Expand All @@ -88,8 +85,6 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
return ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY;
case ABORT_TO_STANDBY:
case DEGRADED_STANDBY:
case DEGRADED_STANDBY_FOR_READER:
case DEGRADED_STANDBY_FOR_WRITER:
case STANDBY:
return ClusterRoleRecord.ClusterRole.STANDBY;
case STANDBY_TO_ACTIVE:
Expand All @@ -114,7 +109,7 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
);

STANDBY.allowedTransitions = ImmutableSet.of(STANDBY_TO_ACTIVE,
DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER);
DEGRADED_STANDBY);
// This needs to be manually recovered by operator
OFFLINE.allowedTransitions = ImmutableSet.of();
// This needs to be manually recovered by operator
Expand All @@ -127,11 +122,7 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
STANDBY_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ABORT_TO_STANDBY,
ACTIVE_IN_SYNC);
DEGRADED_STANDBY.allowedTransitions
= ImmutableSet.of(DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER);
DEGRADED_STANDBY_FOR_WRITER.allowedTransitions = ImmutableSet.of(STANDBY,
DEGRADED_STANDBY);
DEGRADED_STANDBY_FOR_READER.allowedTransitions = ImmutableSet.of(STANDBY,
DEGRADED_STANDBY);
= ImmutableSet.of(STANDBY);
ACTIVE_WITH_OFFLINE_PEER.allowedTransitions = ImmutableSet.of(ACTIVE_NOT_IN_SYNC);
ABORT_TO_ACTIVE_IN_SYNC.allowedTransitions = ImmutableSet.of(ACTIVE_IN_SYNC);
ABORT_TO_ACTIVE_NOT_IN_SYNC.allowedTransitions = ImmutableSet.of(ACTIVE_NOT_IN_SYNC);
Expand Down Expand Up @@ -164,40 +155,36 @@ public static HAGroupState from(byte[] bytes) {
private final String protocolVersion;
private final String haGroupName;
private final HAGroupState haGroupState;
private final long recordVersion;
private final Long lastSyncStateTimeInMs;
private final String policy;
private final String peerZKUrl;
private final String clusterUrl;
private final String peerClusterUrl;
private final long adminCRRVersion;

@JsonCreator
public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersion,
@JsonProperty("haGroupName") String haGroupName,
@JsonProperty("haGroupState") HAGroupState haGroupState,
@JsonProperty("recordVersion") Long recordVersion,
@JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs) {
@JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs,
@JsonProperty("policy") String policy,
@JsonProperty("peerZKUrl") String peerZKUrl,
@JsonProperty("clusterUrl") String clusterUrl,
@JsonProperty("peerClusterUrl") String peerClusterUrl,
@JsonProperty("adminCRRVersion")
long adminCRRVersion) {
Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!");
Preconditions.checkNotNull(haGroupState, "HA group state cannot be null!");
Preconditions.checkNotNull(recordVersion, "Record version cannot be null!");

this.protocolVersion = Objects.toString(protocolVersion, DEFAULT_PROTOCOL_VERSION);
this.haGroupName = haGroupName;
this.haGroupState = haGroupState;
this.recordVersion = recordVersion;
this.policy = policy;
this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
}

/**
* Convenience constructor for backward compatibility without lastSyncStateTimeInMs.
*/
public HAGroupStoreRecord(String protocolVersion,
String haGroupName, HAGroupState haGroupState, Long recordVersion) {
this(protocolVersion, haGroupName, haGroupState, recordVersion, null);
}

/**
* Convenience constructor for backward compatibility without recordVersion and lastSyncStateTimeInMs.
*/
public HAGroupStoreRecord(String protocolVersion,
String haGroupName, HAGroupState haGroupState) {
this(protocolVersion, haGroupName, haGroupState, DEFAULT_RECORD_VERSION, null);
this.peerZKUrl = peerZKUrl;
this.clusterUrl = clusterUrl;
this.peerClusterUrl = peerClusterUrl;
this.adminCRRVersion = adminCRRVersion;
}

public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
Expand All @@ -222,8 +209,12 @@ public boolean hasSameInfo(HAGroupStoreRecord other) {
return haGroupName.equals(other.haGroupName)
&& haGroupState.equals(other.haGroupState)
&& protocolVersion.equals(other.protocolVersion)
&& recordVersion == other.recordVersion
&& Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs);
&& Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs)
&& Objects.equals(policy, other.policy)
&& Objects.equals(peerZKUrl, other.peerZKUrl)
&& Objects.equals(clusterUrl, other.clusterUrl)
&& Objects.equals(peerClusterUrl, other.peerClusterUrl)
&& adminCRRVersion == other.adminCRRVersion;
}

public String getProtocolVersion() {
Expand All @@ -234,10 +225,6 @@ public String getHaGroupName() {
return haGroupName;
}

public long getRecordVersion() {
return recordVersion;
}

@JsonProperty("haGroupState")
public HAGroupState getHAGroupState() {
return haGroupState;
Expand All @@ -247,6 +234,26 @@ public Long getLastSyncStateTimeInMs() {
return lastSyncStateTimeInMs;
}

public String getPeerZKUrl() {
return peerZKUrl;
}

public String getPolicy() {
return policy;
}

public String getClusterUrl() {
return clusterUrl;
}

public String getPeerClusterUrl() {
return peerClusterUrl;
}

public long getAdminCRRVersion() {
return adminCRRVersion;
}

@JsonIgnore
public ClusterRoleRecord.ClusterRole getClusterRole() {
return haGroupState.getClusterRole();
Expand All @@ -258,8 +265,12 @@ public int hashCode() {
.append(protocolVersion)
.append(haGroupName)
.append(haGroupState)
.append(recordVersion)
.append(lastSyncStateTimeInMs)
.append(policy)
.append(peerZKUrl)
.append(clusterUrl)
.append(peerClusterUrl)
.append(adminCRRVersion)
.hashCode();
}

Expand All @@ -277,8 +288,12 @@ public boolean equals(Object other) {
.append(protocolVersion, otherRecord.protocolVersion)
.append(haGroupName, otherRecord.haGroupName)
.append(haGroupState, otherRecord.haGroupState)
.append(recordVersion, otherRecord.recordVersion)
.append(lastSyncStateTimeInMs, otherRecord.lastSyncStateTimeInMs)
.append(policy, otherRecord.policy)
.append(peerZKUrl, otherRecord.peerZKUrl)
.append(clusterUrl, otherRecord.clusterUrl)
.append(peerClusterUrl, otherRecord.peerClusterUrl)
.append(adminCRRVersion, otherRecord.adminCRRVersion)
.isEquals();
}
}
Expand All @@ -289,8 +304,12 @@ public String toString() {
+ "protocolVersion='" + protocolVersion + '\''
+ ", haGroupName='" + haGroupName + '\''
+ ", haGroupState=" + haGroupState
+ ", recordVersion=" + recordVersion
+ ", lastSyncStateTimeInMs=" + lastSyncStateTimeInMs
+ ", policy='" + policy + '\''
+ ", peerZKUrl='" + peerZKUrl + '\''
+ ", clusterUrl='" + clusterUrl + '\''
+ ", peerClusterUrl='" + peerClusterUrl + '\''
+ ", adminCRRVersion=" + adminCRRVersion
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String PARENT_PARTITION_START_TIME = "PARENT_PARTITION_START_TIME";
public static final String HA_GROUP_NAME = "HA_GROUP_NAME";
public static final String POLICY = "POLICY";
public static final String VERSION_CLUSTER_1 = "VERSION_CLUSTER_1";
public static final String VERSION_CLUSTER_2 = "VERSION_CLUSTER_2";
public static final String VERSION = "VERSION";
public static final String ZK_URL_1 = "ZK_URL_1";
public static final String ZK_URL_2 = "ZK_URL_2";
public static final String CLUSTER_URL_1 = "CLUSTER_URL_1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION_CLUSTER_1;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION_CLUSTER_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
Expand Down Expand Up @@ -709,8 +708,7 @@ enum JoinType {INNER, LEFT_OUTER}
CLUSTER_ROLE_1 + " VARCHAR," + // Role for peer cluster might not be accurate, we will use only local role for recovery if needed
CLUSTER_ROLE_2 + " VARCHAR," + // Role for peer cluster might not be accurate, we will use only local role for recovery if needed
POLICY + " VARCHAR," + // There should be only one policy for an HA group
VERSION_CLUSTER_1 + " BIGINT," + // Version should be incremented for Admin Updates, only for CLUSTER_URLs and REGISTRY_TYPE
VERSION_CLUSTER_2 + " BIGINT," + // Version should be incremented for Admin Updates, only for CLUSTER_URLs and REGISTRY_TYPE
VERSION + " BIGINT," + // Version should be incremented for Admin Updates, only for CLUSTER_URLs and REGISTRY_TYPE
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME +
" PRIMARY KEY (" + HA_GROUP_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public interface QueryServices extends SQLCloseable {
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
public static final String MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCachePersistenceTimeToLiveMs";

@Deprecated // Use FORCE_ROW_KEY_ORDER instead.
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";

public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes";
@Deprecated // use the IMMUTABLE keyword while creating the table
public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows";
Expand Down Expand Up @@ -161,7 +161,7 @@ public interface QueryServices extends SQLCloseable {
"phoenix.index.failure.handling.rebuild.interval";
public static final String INDEX_REBUILD_TASK_INITIAL_DELAY = "phoenix.index.rebuild.task.initial.delay";
public static final String START_TRUNCATE_TASK_DELAY = "phoenix.start.truncate.task.delay";

public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable";
// If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it
public static final String INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = "phoenix.index.rebuild.disabletimestamp.threshold";
Expand Down Expand Up @@ -215,7 +215,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";

public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async";
public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size";
public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
Expand Down Expand Up @@ -244,7 +244,7 @@ public interface QueryServices extends SQLCloseable {

// Tag Name to determine the Phoenix Client Type
public static final String CLIENT_METRICS_TAG = "phoenix.client.metrics.tag";

// Transaction related configs
public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
// Controls whether or not uncommitted data is automatically sent to HBase
Expand All @@ -263,7 +263,7 @@ public interface QueryServices extends SQLCloseable {
public static final String ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = "phoenix.view.allowNewColumnFamily";
public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";

public static final String MAX_VERSIONS_TRANSACTIONAL_ATTRIB = "phoenix.transactions.maxVersions";

// metadata configs
Expand All @@ -281,7 +281,7 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";
public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold";

//currently BASE64 and ASCII is supported
public static final String UPLOAD_BINARY_DATA_TYPE_ENCODING = "phoenix.upload.binaryDataType.encoding";
// Toggle for server-written updates to SYSTEM.CATALOG
Expand Down Expand Up @@ -407,7 +407,7 @@ public interface QueryServices extends SQLCloseable {
// Also, before 4.15 when we added a column to a base table we would have to propagate the
// column metadata to all its child views. After PHOENIX-3534 we no longer propagate metadata
// changes from a parent to its children (we just resolve its ancestors and include their columns)
//
//
// The following config is used to continue writing the parent table column metadata while
// creating a view and also prevent metadata changes to a parent table/view that needs to be
// propagated to its children. This is done to allow rollback of the splittable SYSTEM.CATALOG
Expand Down Expand Up @@ -561,6 +561,8 @@ public interface QueryServices extends SQLCloseable {

public static final String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled";

// HA Group Store sync job interval in seconds
String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds";

/**
* Get executor service used for parallel scans
Expand Down
Loading