Skip to content

Commit

Permalink
NIFI-13702 Remove deprecated property STATE_UPDATE_INTERVAL from Capt…
Browse files Browse the repository at this point in the history
…ureChangeMySQL

This closes apache#9225

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
EndzeitBegins authored and exceptionfactory committed Sep 3, 2024
1 parent 1ad9d86 commit d1432d6
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
Expand Down Expand Up @@ -104,6 +79,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
Expand All @@ -115,6 +91,32 @@
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.regex.Pattern;

import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
Expand Down Expand Up @@ -361,19 +363,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder()
.name("capture-change-mysql-state-update-interval")
.displayName("State Update Interval")
.description("DEPRECATED. This property is no longer used and exists solely for backward compatibility purposes. Indicates how often to update the processor's state with binlog "
+ "file/position values. A value of zero means that state will only be updated when the processor is "
+ "stopped or shutdown. If at some point the processor state does not contain the desired binlog values, the last flow file emitted will contain the last observed values, "
+ "and the processor can be returned to that state by using the Initial Binlog File, Initial Binlog Position, and Initial Sequence ID properties.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();

public static final PropertyDescriptor INIT_SEQUENCE_ID = new PropertyDescriptor.Builder()
.name("capture-change-mysql-init-seq-id")
.displayName("Initial Sequence ID")
Expand Down Expand Up @@ -476,7 +465,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
RETRIEVE_ALL_RECORDS,
INCLUDE_BEGIN_COMMIT,
INCLUDE_DDL_EVENTS,
STATE_UPDATE_INTERVAL,
INIT_SEQUENCE_ID,
INIT_BINLOG_FILENAME,
INIT_BINLOG_POSITION,
Expand Down Expand Up @@ -528,6 +516,11 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}

@Override
public void migrateProperties(PropertyConfiguration config) {
config.removeProperty("capture-change-mysql-state-update-interval");
}

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,15 +1014,14 @@ public void testUpdateState() throws Exception {

testRunner.run(1, false, false);

// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
// Ensure state not set, as the processor hasn't been stopped
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, INIT_BIN_LOG_FILENAME, Scope.CLUSTER);
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, FOUR, Scope.CLUSTER);
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER);

testRunner.getStateManager().clear(Scope.CLUSTER);

// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, "1 second");
// Send some events and verify the state was set
testRunner.run(1, false, true);

// ROTATE
Expand Down Expand Up @@ -1087,8 +1086,7 @@ public void testUpdateStateUseGtid() throws Exception {
((CaptureChangeMySQL) testRunner.getProcessor()).clearState();
testRunner.getStateManager().clear(Scope.CLUSTER);

// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, "1 second");
// Send some events and verify the state was set
testRunner.run(1, false, true);

// GTID
Expand Down

0 comments on commit d1432d6

Please sign in to comment.