Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 / S3 Data Lake: use micronaut injection instead of System.getenv for assume role #51618

Open
wants to merge 6 commits into
base: edgao/micronaut_initial_cleanup
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
Expand All @@ -28,15 +29,24 @@ const val OVERRIDE_ENV = "override"
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
additionalMicronautEnvs: List<String> = emptyList(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
) :
AirbyteConnectorRunner(
"source",
args,
additionalMicronautEnvs,
systemEnv,
micronautProperties = emptyMap(),
testBeanDefinitions
) {
companion object {
@JvmStatic
fun run(vararg args: String) {
AirbyteSourceRunner(args).run<AirbyteConnectorRunnable>()
fun run(vararg args: String, additionalMicronautEnvs: List<String> = emptyList()) {
AirbyteSourceRunner(args, additionalMicronautEnvs).run<AirbyteConnectorRunnable>()
}
}
}
Expand All @@ -45,15 +55,25 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
additionalMicronautEnvs: List<String> = emptyList(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
micronautProperties: Map<String, String> = emptyMap(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
) :
AirbyteConnectorRunner(
"destination",
args,
additionalMicronautEnvs,
systemEnv,
micronautProperties,
testBeanDefinitions
) {
companion object {
@JvmStatic
fun run(vararg args: String) {
AirbyteDestinationRunner(args).run<AirbyteConnectorRunnable>()
fun run(vararg args: String, additionalMicronautEnvs: List<String> = emptyList()) {
AirbyteDestinationRunner(args, additionalMicronautEnvs).run<AirbyteConnectorRunnable>()
}
}
}
Expand All @@ -65,7 +85,9 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
additionalMicronautEnvs: List<String> = emptyList(),
systemEnv: Map<String, String>,
val micronautProperties: Map<String, String> = emptyMap(),
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
) {
val envs: Array<String> =
Expand All @@ -76,7 +98,8 @@ sealed class AirbyteConnectorRunner(
// any junit calls. This doesn't work if we launch the connector from a different
// thread, e.g. `Dispatchers.IO`. Force the test env if needed. Some tests launch the
// connector from the IO context to avoid blocking themselves.
listOfNotNull(Environment.TEST.takeIf { testBeanDefinitions.isNotEmpty() })
listOfNotNull(Environment.TEST.takeIf { testBeanDefinitions.isNotEmpty() }) +
additionalMicronautEnvs

inline fun <reified R : Runnable> run() {
val picocliCommandLineFactory = PicocliCommandLineFactory(this)
Expand All @@ -87,6 +110,8 @@ sealed class AirbyteConnectorRunner(
picocliCommandLineFactory.commands.options().map { it.longestName() },
)
val commandLinePropertySource = CommandLinePropertySource(micronautCommandLine)
val additionalPropertiesSource =
MapPropertySource("additional_properties", micronautProperties)
val ctx: ApplicationContext =
// note that we put the override envs last.
// This ensures that micronaut gives those environments precedence
Expand All @@ -97,6 +122,7 @@ sealed class AirbyteConnectorRunner(
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
additionalPropertiesSource,
)
.toTypedArray(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ data object CliRunner {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
AirbyteSourceRunner(
args,
additionalMicronautEnvs = emptyList(),
featureFlags.systemEnv,
out.beanDefinition
)
}
return CliRunnable(runnable, out.results)
}
Expand All @@ -53,6 +58,8 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
Expand All @@ -65,7 +72,9 @@ data object CliRunner {
makeRunnable(op, configPath, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
additionalMicronautEnvs = additionalMicronautEnvs,
featureFlags.systemEnv,
micronautProperties,
inputBeanDefinition,
out.beanDefinition,
)
Expand All @@ -80,6 +89,8 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<String, String> = emptyMap(),
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
Expand All @@ -97,6 +108,8 @@ data object CliRunner {
catalog,
state,
inputStream,
additionalMicronautEnvs,
micronautProperties,
*featureFlags.toTypedArray()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.Property
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
Expand All @@ -29,13 +30,17 @@ data class CheckTestConfig(val configPath: Path, val featureFlags: Set<FeatureFl
open class CheckIntegrationTest<T : ConfigurationSpecification>(
val successConfigFilenames: List<CheckTestConfig>,
val failConfigFilenamesAndFailureReasons: Map<CheckTestConfig, Pattern>,
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<Property, String> = emptyMap(),
configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
) :
IntegrationTest(
additionalMicronautEnvs = additionalMicronautEnvs,
dataDumper = FakeDataDumper,
destinationCleaner = NoopDestinationCleaner,
recordMangler = NoopExpectedRecordMapper,
configUpdater = configUpdater,
micronautProperties = micronautProperties,
) {
@Test
open fun testSuccessConfigs() {
Expand All @@ -46,6 +51,7 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
"check",
configContents = config,
featureFlags = featureFlags.toTypedArray(),
micronautProperties = micronautProperties,
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down Expand Up @@ -74,6 +80,7 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
"check",
configContents = config,
featureFlags = featureFlags.toTypedArray(),
micronautProperties = micronautProperties,
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.Property
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.cdk.load.util.deserializeToPrettyPrintedString
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -34,11 +35,16 @@ import org.junit.jupiter.api.assertAll
* of the diff. This diff is _really messy_ for the initial migration from the old CDK to the new
* one, but after that, it should be pretty readable.
*/
abstract class SpecTest :
abstract class SpecTest(
additionalMicronautEnvs: List<String> = emptyList(),
micronautProperties: Map<Property, String> = emptyMap(),
) :
IntegrationTest(
additionalMicronautEnvs = additionalMicronautEnvs,
dataDumper = FakeDataDumper,
destinationCleaner = NoopDestinationCleaner,
recordMangler = NoopExpectedRecordMapper,
micronautProperties = micronautProperties,
) {
private val testResourcesPath = Path.of("src/test-integration/resources")

Expand Down Expand Up @@ -67,6 +73,7 @@ abstract class SpecTest :
destinationProcessFactory.createDestinationProcess(
"spec",
featureFlags = featureFlags,
micronautProperties = micronautProperties,
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException
import io.airbyte.cdk.load.test.util.destination_process.NonDockerizedDestination
import io.airbyte.cdk.load.test.util.destination_process.Property
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
Expand Down Expand Up @@ -49,6 +50,7 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension
@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION", justification = "Micronaut DI")
@ExtendWith(SystemStubsExtension::class)
abstract class IntegrationTest(
additionalMicronautEnvs: List<String>,
val dataDumper: DestinationDataDumper,
val destinationCleaner: DestinationCleaner,
val recordMangler: ExpectedRecordMapper = NoopExpectedRecordMapper,
Expand All @@ -57,11 +59,12 @@ abstract class IntegrationTest(
val nullEqualsUnset: Boolean = false,
val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
val envVars: Map<String, String> = emptyMap(),
val micronautProperties: Map<Property, String> = emptyMap(),
) {
// Intentionally don't inject the actual destination process - we need a full factory
// because some tests want to run multiple syncs, so we need to run the destination
// multiple times.
val destinationProcessFactory = DestinationProcessFactory.get()
val destinationProcessFactory = DestinationProcessFactory.get(additionalMicronautEnvs)

@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
Expand Down Expand Up @@ -180,6 +183,7 @@ abstract class IntegrationTest(
catalog.asProtocolObject(),
useFileTransfer = useFileTransfer,
envVars = envVars,
micronautProperties = micronautProperties,
)
return runBlocking(Dispatchers.IO) {
launch { destination.run() }
Expand Down Expand Up @@ -222,7 +226,8 @@ abstract class IntegrationTest(
configContents,
DestinationCatalog(listOf(stream)).asProtocolObject(),
useFileTransfer,
envVars
envVars,
edgao marked this conversation as resolved.
Show resolved Hide resolved
micronautProperties = micronautProperties,
)
return runBlocking(Dispatchers.IO) {
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,19 @@ abstract class DestinationProcessFactory {
catalog: ConfiguredAirbyteCatalog? = null,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
micronautProperties: Map<Property, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): DestinationProcess

companion object {
fun get(): DestinationProcessFactory =
/**
* [additionalMicronautEnvs] is only passed into the non-docker connector. We assume that
* the dockerized connector is capable of setting its own micronaut environments.
*/
fun get(additionalMicronautEnvs: List<String>): DestinationProcessFactory =
when (val runner = System.getenv("AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER")) {
null,
"non-docker" -> NonDockerizedDestinationFactory()
"non-docker" -> NonDockerizedDestinationFactory(additionalMicronautEnvs)
"docker" -> {
val rawProperties: Map<String, Any?> =
YamlPropertySourceLoader()
Expand All @@ -91,3 +96,19 @@ abstract class DestinationProcessFactory {
}
}
}

/**
* Represents a micronaut property, which has a corresponding entry in micronaut's `application.yml`
* file, which is populated by an environment variable. Just a pair of the micronaut property name,
* and that corresponding env var name.
*
* For example, this application.yaml:
* ```yaml
* airbyte:
* destination:
* foo-bar: ${FOO_BAR}
* ```
*
* Would be represented as `Property("airbyte.destination.foo-bar", "FOO_BAR")`.
*/
data class Property(val micronautProperty: String, val environmentVariable: String)
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class DockerizedDestination(
listOf("-e", "$key=$value")
}

// DANGER: env vars can contain secrets, so you MUST NOT log this command.
val cmd: MutableList<String> =
(listOf(
"docker",
Expand Down Expand Up @@ -158,7 +159,6 @@ class DockerizedDestination(
configContents?.let { addInput("config", it.toByteArray(Charsets.UTF_8)) }
catalog?.let { addInput("catalog", catalog.serializeToJsonBytes()) }

logger.info { "Executing command: ${cmd.joinToString(" ")}" }
process = ProcessBuilder(cmd).start()
// Annoyingly, the process's stdin is called "outputStream"
destinationStdin = BufferedWriter(OutputStreamWriter(process.outputStream, Charsets.UTF_8))
Expand Down Expand Up @@ -290,6 +290,7 @@ class DockerizedDestinationFactory(
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
micronautProperties: Map<Property, String>,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
return DockerizedDestination(
Expand All @@ -299,7 +300,7 @@ class DockerizedDestinationFactory(
catalog,
testName,
useFileTransfer,
envVars,
envVars + micronautProperties.mapKeys { (k, _) -> k.environmentVariable },
*featureFlags,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class NonDockerizedDestination(
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
additionalMicronautEnvs: List<String>,
micronautProperties: Map<Property, String>,
vararg featureFlags: FeatureFlag,
) : DestinationProcess {
private val destinationStdinPipe: PrintWriter
Expand Down Expand Up @@ -73,6 +75,8 @@ class NonDockerizedDestination(
catalog = catalog,
inputStream = destinationStdin,
featureFlags = featureFlags,
additionalMicronautEnvs = additionalMicronautEnvs,
micronautProperties = micronautProperties.mapKeys { (k, _) -> k.micronautProperty },
)
}

Expand Down Expand Up @@ -121,13 +125,16 @@ class NonDockerizedDestination(
}
}

class NonDockerizedDestinationFactory : DestinationProcessFactory() {
class NonDockerizedDestinationFactory(
private val additionalMicronautEnvs: List<String>,
) : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
micronautProperties: Map<Property, String>,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
// TODO pass test name into the destination process
Expand All @@ -137,6 +144,8 @@ class NonDockerizedDestinationFactory : DestinationProcessFactory() {
catalog,
useFileTransfer,
envVars,
additionalMicronautEnvs,
micronautProperties,
*featureFlags
)
}
Expand Down
Loading
Loading