diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java index a33af9dc..30e42e87 100644 --- a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, false); commitLogReaderService.initialize(); diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..6f269ee9 --- /dev/null +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,23 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { + + } +} diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index 5ddd88f8..b7996845 100644 --- a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -35,8 +35,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = new JMXEnabledThreadPoolExecutor( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java index b769a074..969f332a 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true); commitLogReaderService.initialize(); diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..dd5a0119 --- /dev/null +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,45 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import java.io.File; +import java.io.IOException; + + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { + File relocationDir = new File(config.cdcWorkingDir); + if (!relocationDir.exists()) { + if (!relocationDir.mkdir()) { + throw new IOException("Failed to create " + config.cdcWorkingDir); + } + } + + File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER); + if (!archiveDir.exists()) { + if (!archiveDir.mkdir()) { + throw new IOException("Failed to create " + archiveDir); + } + } + File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER); + if (!errorDir.exists()) { + if (!errorDir.mkdir()) { + throw new IOException("Failed to create " + errorDir); + } + } + } +} diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index cf317408..0997f8e4 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -26,7 +26,6 @@ import java.io.File; import java.util.Optional; import java.util.concurrent.*; -import java.util.function.IntBinaryOperator; /** * Consume a queue of commitlog files to read mutations. @@ -37,8 +36,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = new JMXEnabledThreadPoolExecutor( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java index f6db1492..421d65d7 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -66,7 +66,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation().getAbsolutePath(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true); commitLogReaderService.initialize(); diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..13e94842 --- /dev/null +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,47 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +import java.io.File; +import java.io.IOException; + +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ARCHIVE_FOLDER; +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ERROR_FOLDER; + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { + File relocationDir = new File(config.cdcWorkingDir); + if (!relocationDir.exists()) { + if (!relocationDir.mkdir()) { + throw new IOException("Failed to create " + config.cdcWorkingDir); + } + } + + File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER); + if (!archiveDir.exists()) { + if (!archiveDir.mkdir()) { + throw new IOException("Failed to create " + archiveDir); + } + } + File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER); + if (!errorDir.exists()) { + if (!errorDir.mkdir()) { + throw new IOException("Failed to create " + errorDir); + } + } + } +} diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index 1fe35efe..487ad967 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -37,8 +37,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = JMXEnabledThreadPoolExecutor.createAndPrestart( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, 1, TimeUnit.MINUTES, diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java new file mode 100644 index 00000000..cb22d2a3 --- /dev/null +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java @@ -0,0 +1,20 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.agent; + +public interface CommitLogReaderInitializer { + void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception; +} diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java index 41a8975d..b9c83fab 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.util.List; @@ -30,8 +29,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.IntBinaryOperator; -import java.util.function.LongBinaryOperator; @Slf4j public abstract class CommitLogReaderService implements Runnable, AutoCloseable @@ -72,6 +69,7 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable * ordered commitlog file queue. */ final PriorityBlockingQueue commitLogQueue; + private final CommitLogReaderInitializer commitLogReaderInitializer; /** * Consumes commitlog files in parallel. @@ -81,11 +79,13 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable public CommitLogReaderService(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { this.config = config; this.mutationSender = mutationSender; this.segmentOffsetWriter = segmentOffsetWriter; this.commitLogTransfer = commitLogTransfer; + this.commitLogReaderInitializer = commitLogReaderInitializer; this.commitLogQueue = new PriorityBlockingQueue<>(128, CommitLogUtil::compareCommitLogs); } @@ -150,25 +150,7 @@ public void submitCommitLog(File file) { } public void initialize() throws Exception { - File relocationDir = new File(config.cdcWorkingDir); - if (!relocationDir.exists()) { - if (!relocationDir.mkdir()) { - throw new IOException("Failed to create " + config.cdcWorkingDir); - } - } - - File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); - if (!archiveDir.exists()) { - if (!archiveDir.mkdir()) { - throw new IOException("Failed to create " + archiveDir); - } - } - File errorDir = new File(relocationDir, ERROR_FOLDER); - if (!errorDir.exists()) { - if (!errorDir.mkdir()) { - throw new IOException("Failed to create " + errorDir); - } - } + commitLogReaderInitializer.initialize(config, this); } /**