Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ scan-journal.log

# connectors' cache
*.sqlite

.devenv
.direnv
**/.metals
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
plugins {
id 'application'
id 'airbyte-java-connector'
alias(libs.plugins.kotlin.jvm)
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.9'
features = ['db-sources']
cdkVersionRequired = '0.20.6'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}

airbyteJavaConnector.addCdkDependencies()

application {
mainClass = 'io.airbyte.integrations.source.mongodb.MongoDbSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
Expand All @@ -35,47 +32,40 @@ sourceSets {
}
}

dependencies {
implementation libs.mongo.driver.sync

testImplementation libs.testcontainers.mongodb
java {
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
}
}

integrationTestJavaImplementation libs.apache.commons.lang
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mongodb-v2-plain')
dependencies {
implementation 'io.debezium:debezium-embedded:2.4.0.Final'
implementation 'io.debezium:debezium-connector-mongodb:2.4.0.Final'

dataGeneratorImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testImplementation 'org.testcontainers:mongodb:1.19.0'

dataGeneratorImplementation project(':airbyte-integrations:connectors:source-mongodb-v2-plain')
dataGeneratorImplementation libs.mongo.driver.sync
dataGeneratorImplementation libs.kotlin.logging
dataGeneratorImplementation libs.kotlinx.cli
dataGeneratorImplementation (libs.java.faker) {
exclude module: 'snakeyaml'
}
dataGeneratorImplementation libs.jackson.databind
dataGeneratorImplementation libs.bundles.slf4j
dataGeneratorImplementation libs.slf4j.simple
dataGeneratorImplementation libs.kotlinx.cli.jvm
dataGeneratorImplementation 'org.yaml:snakeyaml:2.2'

debeziumTestImplementation libs.debezium.api
debeziumTestImplementation libs.debezium.embedded
debeziumTestImplementation libs.debezium.sqlserver
debeziumTestImplementation libs.debezium.mysql
debeziumTestImplementation libs.debezium.postgres
debeziumTestImplementation libs.debezium.mongodb
debeziumTestImplementation libs.bundles.slf4j
debeziumTestImplementation libs.slf4j.simple
debeziumTestImplementation libs.kotlinx.cli.jvm
debeziumTestImplementation libs.spotbugs.annotations
dataGeneratorImplementation platform('com.fasterxml.jackson:jackson-bom:2.15.2')
dataGeneratorImplementation 'com.fasterxml.jackson.core:jackson-databind'
dataGeneratorImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'

dataGeneratorImplementation ('com.github.javafaker:javafaker:1.0.2') { exclude module: 'snakeyaml' }
dataGeneratorImplementation 'io.github.oshai:kotlin-logging-jvm:5.1.0'
dataGeneratorImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:4.10.2'

debeziumTestImplementation 'io.debezium:debezium-embedded:2.4.0.Final'
debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.4.0.Final'
debeziumTestImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
debeziumTestImplementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'
}

/*
* Executes the script that generates test data and inserts it into the provided database/collection.
*
* To execute this task, use the following command:
*
* ./gradlew :airbyte-integrations:connectors:source-mongodb-v2:generateTestData -PconnectionString=<connection string> -PdatabaseName=<database name> -PcollectionName=<collection name> -Pusername=<username>
* ./gradlew :airbyte-integrations:connectors:source-mongodb-v2-plain:generateTestData -PconnectionString=<connection string> -PdatabaseName=<database name> -PcollectionName=<collection name> -Pusername=<username>
*
* Optionally, you can provide -PnumberOfDocuments to change the number of generated documents from the default (10,000).
*/
Expand Down Expand Up @@ -109,7 +99,7 @@ tasks.register('generateTestData', JavaExec) {
*
* To execute this task, use the following command:
*
* ./gradlew :airbyte-integrations:connectors:source-mongodb-v2:debeziumTest -PconnectionString=<connection string> -PdatabaseName=<database name> -PcollectionName=<collection name> -Pusername=<username>
* ./gradlew :airbyte-integrations:connectors:source-mongodb-v2-plain:debeziumTest -PconnectionString=<connection string> -PdatabaseName=<database name> -PcollectionName=<collection name> -Pusername=<username>
*/
tasks.register('debeziumTest', JavaExec) {
def arguments = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.",
"default": 10000,
"order": 10,
"minimum": 1000,
"minimum": 10,
"maximum": 100000,
"group": "advanced"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
data:
ab_internal:
ql: 200
sl: 100
sl: 200
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.1
dockerImageTag: 1.2.10
dockerRepository: airbyte/source-mongodb-v2-plain
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
Expand Down Expand Up @@ -86,8 +90,13 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
final Bson filter = existingState
// TODO add type support here when we add support for _id fields that are not ObjectId types
.map(state -> Filters.gt(MongoConstants.ID_FIELD, new ObjectId(state.id())))
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.connection.SslSettings;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;

Expand All @@ -30,14 +29,12 @@ public class MongoConnectionUtils {
* @return The configured {@link MongoClient}.
*/
public static MongoClient createMongoClient(final MongoDbSourceConfig config) {

final ConnectionString mongoConnectionString = new ConnectionString(buildConnectionString(config));

final MongoDriverInformation mongoDriverInformation = MongoDriverInformation.builder()
.driverName(DRIVER_NAME)
.build();


final MongoClientSettings.Builder mongoClientSettingsBuilder = MongoClientSettings.builder()
.applyConnectionString(mongoConnectionString)
.applyToSslSettings(s -> s.enabled(false))
Expand All @@ -54,7 +51,7 @@ public static MongoClient createMongoClient(final MongoDbSourceConfig config) {
}

private static String buildConnectionString(final MongoDbSourceConfig config) {
String sslConnectionString = MongoDbDebeziumPropertiesManager.buildConnectionString(config.rawConfig(), true);
String sslConnectionString = MongoDbDebeziumPropertiesManager.buildConnectionString(config.getDatabaseConfig(), true);
String replaced = sslConnectionString.replace("&tls=true","");
System.err.println("Replaced " + replaced);
return replaced;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.integrations.source.mongodb;

import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.Configuration;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.Configuration;
import java.time.Duration;

public class MongoConstants {
Expand Down Expand Up @@ -34,6 +34,8 @@ public class MongoConstants {
public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY;
public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY;
public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD;
public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds";
public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300;

private MongoConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;
Expand All @@ -27,33 +29,32 @@
*/
public record MongoDbSourceConfig(JsonNode rawConfig) {

/**
* Constructs a new {@link MongoDbSourceConfig} from the provided raw configuration.
*
* @param rawConfig The underlying JSON configuration provided by the connector framework.
* @throws IllegalArgumentException if the raw configuration does not contain the
* {@link MongoConstants#DATABASE_CONFIG_CONFIGURATION_KEY} key.
*/
public MongoDbSourceConfig(final JsonNode rawConfig) {
if (rawConfig.has(DATABASE_CONFIG_CONFIGURATION_KEY)) {
this.rawConfig = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
} else {
public MongoDbSourceConfig {
if (rawConfig == null) {
throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config.");
}
if (!rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) {
throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property.");
}
}

public JsonNode getDatabaseConfig() {
return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
}

public String getAuthSource() {
return rawConfig.has(AUTH_SOURCE_CONFIGURATION_KEY) ? rawConfig.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
return getDatabaseConfig().has(AUTH_SOURCE_CONFIGURATION_KEY) ? getDatabaseConfig().get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
: DEFAULT_AUTH_SOURCE;
}

public Integer getCheckpointInterval() {
return rawConfig.has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? rawConfig.get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY)
? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
: CHECKPOINT_INTERVAL;
}

public String getDatabaseName() {
return rawConfig.has(DATABASE_CONFIGURATION_KEY) ? rawConfig.get(DATABASE_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(DATABASE_CONFIGURATION_KEY) ? getDatabaseConfig().get(DATABASE_CONFIGURATION_KEY).asText() : null;
}

public OptionalInt getQueueSize() {
Expand All @@ -63,15 +64,15 @@ public OptionalInt getQueueSize() {
}

public String getPassword() {
return rawConfig.has(PASSWORD_CONFIGURATION_KEY) ? rawConfig.get(PASSWORD_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY) ? getDatabaseConfig().get(PASSWORD_CONFIGURATION_KEY).asText() : null;
}

public String getUsername() {
return rawConfig.has(USERNAME_CONFIGURATION_KEY) ? rawConfig.get(USERNAME_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) ? getDatabaseConfig().get(USERNAME_CONFIGURATION_KEY).asText() : null;
}

public boolean hasAuthCredentials() {
return rawConfig.has(USERNAME_CONFIGURATION_KEY) && rawConfig.has(PASSWORD_CONFIGURATION_KEY);
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) && getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY);
}

public Integer getSampleSize() {
Expand All @@ -83,8 +84,16 @@ public Integer getSampleSize() {
}

public boolean getEnforceSchema() {
return rawConfig.has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? rawConfig.get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
return getDatabaseConfig().has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? getDatabaseConfig().get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
: true;
}

public Integer getInitialWaitingTimeSeconds() {
if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) {
return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC);
} else {
return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.mongodb.MongoException;
import com.mongodb.client.MongoCursor;
import io.airbyte.cdk.integrations.debezium.CdcMetadataInjector;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbCdcEventUtils;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcEventUtils;
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
Expand Down Expand Up @@ -69,10 +69,15 @@ public class MongoDbStateIterator implements Iterator<AirbyteMessage> {
private boolean finalStateNext = false;

/**
* Tracks if the underlying iterator threw an exception. This helps to determine the final state
* status emitted from the final next call.
* Tracks if the underlying iterator threw an exception, indicating that the snapshot for this
* stream failed. This helps to determine the final state status emitted from the final next call.
*/
private boolean iterThrewException = false;
private boolean initialSnapshotFailed = false;

/**
* Tracks the exception thrown if there initial snapshot has failed.
*/
private Exception initialSnapshotException;

/**
* Constructor.
Expand Down Expand Up @@ -111,14 +116,24 @@ public MongoDbStateIterator(final MongoCursor<Document> iter,
@Override
public boolean hasNext() {
LOGGER.debug("Checking hasNext() for stream {}...", getStream());
if (initialSnapshotFailed) {
// If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This
// will ensure the platform retry logic
// kicks in and keeps retrying the sync until the initial snapshot is complete.
throw new RuntimeException(initialSnapshotException);
}
try {
if (iter.hasNext()) {
return true;
}
} catch (final MongoException e) {
// If hasNext throws an exception, log it and then treat it as if hasNext returned false.
iterThrewException = true;
// If hasNext throws an exception, log it and set the flag to indicate that the initial snapshot
// failed. This indicates to the main iterator
// to emit state associated with what has been processed so far.
initialSnapshotFailed = true;
initialSnapshotException = e;
LOGGER.info("hasNext threw an exception for stream {}: {}", getStream(), e.getMessage(), e);
return true;
}

// no more records in cursor + no record messages have been emitted => collection is empty
Expand All @@ -145,9 +160,9 @@ public AirbyteMessage next() {
// Should a state message be emitted based on then last time a state message was emitted?
final var emitStateDueToDuration = count > 0 && Duration.between(lastCheckpoint, Instant.now()).compareTo(checkpointDuration) > 0;

if (finalStateNext) {
if (finalStateNext || initialSnapshotFailed) {
LOGGER.debug("Emitting final state status for stream {}:{}...", stream.getStream().getNamespace(), stream.getStream().getName());
final var finalStateStatus = iterThrewException ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE;
final var finalStateStatus = initialSnapshotFailed ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE;
final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType);
Expand Down
Loading