diff --git a/fns-hl7-pipeline/fn-receiver-debatcher/deployZip.sh b/fns-hl7-pipeline/fn-receiver-debatcher/deployZip.sh index 69c6acb0e..bdc60ce53 100644 --- a/fns-hl7-pipeline/fn-receiver-debatcher/deployZip.sh +++ b/fns-hl7-pipeline/fn-receiver-debatcher/deployZip.sh @@ -22,6 +22,6 @@ cd ../../.. echo "Deploying Zip..." az functionapp deployment source config-zip -g $hl7RG -n $function --src $base_name -### Set FN_VERSION: + fn_version=$(cat pom.xml |grep -oPm1 "(?<=)[^<]+") az functionapp config appsettings set --name $function --resource-group $hl7RG --settings FN_VERSION=$fn_version > /dev/null diff --git a/fns-hl7-pipeline/fn-receiver-debatcher/pom.xml b/fns-hl7-pipeline/fn-receiver-debatcher/pom.xml index dbec29803..a2be4689c 100644 --- a/fns-hl7-pipeline/fn-receiver-debatcher/pom.xml +++ b/fns-hl7-pipeline/fn-receiver-debatcher/pom.xml @@ -25,7 +25,7 @@ ${project.build.directory}/azure-functions/${functionAppName} 2.10.1 1.2.7.1 - 0.0.48-SNAPSHOT + 0.0.47-SNAPSHOT 5.9.2 @@ -82,26 +82,25 @@ com.azure azure-messaging-eventhubs - - - com.google.code.gson - gson - ${gson.version} + com.azure + azure-identity - - com.azure azure-storage-blob + + com.google.code.gson + gson + ${gson.version} + org.junit.jupiter junit-jupiter ${junitjupiter.version} test - com.googlecode.json-simple json-simple @@ -144,6 +143,10 @@ FN_VERSION ${project.version} + + WEBSITE_RUN_FROM_PACKAGE + 1 + diff --git a/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/Function.kt b/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/Function.kt index 4e65c47db..26ff0c6ea 100644 --- a/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/Function.kt +++ b/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/Function.kt @@ -45,7 +45,7 @@ class Function { @QueueTrigger( name = "message", queueName = "%queueName%", - connection = "BlobIngestConnectionString" + connection = "QueueConnection" ) message: String?, context: ExecutionContext ): DexHL7Metadata? { @@ -65,7 +65,7 @@ class Function { ) ) // create blob client - val blobName = event.eventData.url.substringAfter("/${fnConfig.blobIngestContName}/") + val blobName = event.eventData.url.substringAfter("/${fnConfig.blobIngestContainerName}/") logger.info("DEX::Reading blob: $blobName") val blobClient = fnConfig.azBlobProxy.getBlobClient(blobName) diff --git a/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt b/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt index cbea085af..1486b92cf 100644 --- a/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt +++ b/fns-hl7-pipeline/fn-receiver-debatcher/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt @@ -1,5 +1,6 @@ package gov.cdc.dex.hl7 +import com.azure.identity.DefaultAzureCredentialBuilder import gov.cdc.dex.azure.AzureBlobProxy import gov.cdc.dex.azure.DedicatedEventHubSender import org.slf4j.Logger @@ -9,8 +10,9 @@ class FunctionConfig { val azBlobProxy: AzureBlobProxy val evHubSenderOut: DedicatedEventHubSender val evHubSenderReports: DedicatedEventHubSender - private val logger : Logger = LoggerFactory.getLogger(this.javaClass.simpleName) - val blobIngestContName: String = try { + private val logger: Logger = LoggerFactory.getLogger(this.javaClass.simpleName) + + val blobIngestContainerName : String = try { System.getenv("BlobIngestContainerName") } catch (e: NullPointerException) { logger.error("FATAL: Missing environment variable BlobIngestContainerName") @@ -18,34 +20,44 @@ class FunctionConfig { } val evHubSendName: String = try { System.getenv("EventHubSendName") - } catch(e: NullPointerException) { + } catch (e: NullPointerException) { logger.error("FATAL: Missing environment variable EventHubSendName") throw e } val evReportsHubName: String = try { System.getenv("EventReportsHubName") } catch (e: NullPointerException) { - logger.error ("FATAL: Missing environment variable EventReportsHubName") + logger.error("FATAL: Missing environment variable EventReportsHubName") throw e } - init { - //Init Azure Storage connection - val ingestBlobConnStr = try { - System.getenv("BlobIngestConnectionString") + val identityClientId : String = try { + System.getenv("QueueConnection__clientId") + } catch (e: NullPointerException) { + logger.error("FATAL: Missing environment variable QueueConnection__clientId") + throw e + } + // set up the application to use a Managed Identity + val tokenCredential = DefaultAzureCredentialBuilder() + .managedIdentityClientId(identityClientId) + .build() + + //Init Azure Storage connection + val ingestBlobUrl = try { + System.getenv("BlobIngestUrl") } catch (e: NullPointerException) { - logger.error("FATAL: Missing environment variable BlobIngestConnectionString") + logger.error("FATAL: Missing environment variable BlobIngestUrl") throw e } - azBlobProxy = AzureBlobProxy(ingestBlobConnStr, blobIngestContName) - val evHubConnStr = try { - System.getenv("EventHubConnectionString") + azBlobProxy = AzureBlobProxy(ingestBlobUrl, blobIngestContainerName, tokenCredential) + val evHubNamespace = try { + System.getenv("EventHubNamespace") } catch (e: NullPointerException) { - logger.error("FATAL: Missing environment variable EventHubConnectionString") + logger.error("FATAL: Missing environment variable EventHubNamespace") throw e } - evHubSenderOut = DedicatedEventHubSender(evHubConnStr, evHubSendName) - evHubSenderReports = DedicatedEventHubSender(evHubConnStr, evReportsHubName) + evHubSenderOut = DedicatedEventHubSender(evHubNamespace, evHubSendName, tokenCredential) + evHubSenderReports = DedicatedEventHubSender(evHubNamespace, evReportsHubName, tokenCredential) } } \ No newline at end of file diff --git a/fns-hl7-pipeline/fn-receiver-debatcher/src/test/kotlin/DebatcherTest.kt b/fns-hl7-pipeline/fn-receiver-debatcher/src/test/kotlin/DebatcherTest.kt index 4a7610df8..2fbdfbda0 100644 --- a/fns-hl7-pipeline/fn-receiver-debatcher/src/test/kotlin/DebatcherTest.kt +++ b/fns-hl7-pipeline/fn-receiver-debatcher/src/test/kotlin/DebatcherTest.kt @@ -54,8 +54,8 @@ class DebatcherTest { try { // initialize event report metadata val eventMetadata = ReceiverEventMetadata(stage = - ReceiverEventStageMetadata(startProcessingTime = startTime, - eventTimestamp = Date().toIsoString())) + ReceiverEventStageMetadata(startProcessingTime = startTime, + eventTimestamp = Date().toIsoString())) val testFileIS = this::class.java.getResource(filePath).openStream() // Add routing data to Report object for this file val eventReport = ReceiverEventReport() diff --git a/fns-hl7-pipeline/fn-redactor/deployZip.sh b/fns-hl7-pipeline/fn-redactor/deployZip.sh index eac4c784a..d8a43e2c1 100644 --- a/fns-hl7-pipeline/fn-redactor/deployZip.sh +++ b/fns-hl7-pipeline/fn-redactor/deployZip.sh @@ -22,6 +22,6 @@ cd ../../.. echo "Deploying Zip..." az functionapp deployment source config-zip -g $hl7RG -n $function --src $base_name -### Set FN_VERSION: + fn_version=$(cat pom.xml |grep -oPm1 "(?<=)[^<]+") az functionapp config appsettings set --name $function --resource-group $hl7RG --settings FN_VERSION=$fn_version > /dev/null diff --git a/fns-hl7-pipeline/fn-redactor/pom.xml b/fns-hl7-pipeline/fn-redactor/pom.xml index 95afeee09..b630541e0 100644 --- a/fns-hl7-pipeline/fn-redactor/pom.xml +++ b/fns-hl7-pipeline/fn-redactor/pom.xml @@ -5,7 +5,7 @@ gov.cdc.dataexchange redactor - 0.0.43-SNAPSHOT + 0.0.44-SNAPSHOT jar Azure Java Functions @@ -27,7 +27,7 @@ 2.0.5 1.2.9 1.9.0 - 0.0.45-SNAPSHOT + 0.0.47-SNAPSHOT @@ -41,6 +41,14 @@ + + com.azure + azure-identity + + + com.azure + azure-identity-broker + com.microsoft.azure.functions azure-functions-java-library diff --git a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/Function.kt b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/Function.kt index 5efffd60c..60947e78a 100644 --- a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/Function.kt +++ b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/Function.kt @@ -32,7 +32,7 @@ class Function { @EventHubTrigger( name = "msg", eventHubName = "%EventHubReceiveName%", - connection = "EventHubConnectionString", + connection = "EventHubConnection", consumerGroup = "%EventHubConsumerGroup%", ) message: List, @@ -121,7 +121,6 @@ class Function { } catch (e: Exception) { logger.error("Unable to send to event hub ${fnConfig.evHubSendName}: ${e.message}") } - return inputEvent } diff --git a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt index d58e7d920..735702778 100644 --- a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt +++ b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/FunctionConfig.kt @@ -1,5 +1,6 @@ package gov.cdc.dex.hl7 +import com.azure.identity.DefaultAzureCredentialBuilder import gov.cdc.dex.azure.DedicatedEventHubSender import gov.cdc.dex.util.JsonHelper import gov.cdc.dex.util.ProfileConfiguration @@ -11,11 +12,16 @@ class FunctionConfig { val evHubSendName: String = System.getenv("EventHubSendName") init { - //Init Event Hub connections - val evHubConnStr = System.getenv("EventHubConnectionString") - evHubSender = DedicatedEventHubSender(evHubConnStr, evHubSendName) + //Init Event Hub connection + val entraClientId : String = System.getenv("EventHubConnection__clientId") + val evHubNamespace: String = System.getenv("EventHubConnection__fullyQualifiedNamespace") + val tokenCredential = DefaultAzureCredentialBuilder() + .managedIdentityClientId(entraClientId) + .build() + evHubSender = DedicatedEventHubSender(evHubNamespace, evHubSendName, tokenCredential) val profileConfigJson = FunctionConfig::class.java.getResource("/$PROFILE_CONFIG_FILE_PATH")?.readText() profileConfig = JsonHelper.gson.fromJson(profileConfigJson, ProfileConfiguration::class.java) } + } \ No newline at end of file diff --git a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/HealthCheckFunction.kt b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/HealthCheckFunction.kt index 54e26dd1d..0de37edfe 100644 --- a/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/HealthCheckFunction.kt +++ b/fns-hl7-pipeline/fn-redactor/src/main/kotlin/gov/cdc/dex/hl7/HealthCheckFunction.kt @@ -1,12 +1,9 @@ package gov.cdc.dex.hl7 +import com.azure.identity.DefaultAzureCredentialBuilder import com.microsoft.azure.functions.* import com.microsoft.azure.functions.annotation.* -import com.google.gson.JsonObject -import gov.cdc.dex.util.JsonHelper.toJsonElement import gov.cdc.dex.azure.health.* -import java.time.LocalDate - import java.util.* import kotlin.time.measureTime @@ -22,9 +19,13 @@ class HealthCheckFunction { ): HttpResponseMessage { val result = HealthCheckResult() val evHubSendName: String = System.getenv("EventHubSendName") - val evHubConnStr = System.getenv("EventHubConnectionString") + val evHubNamespace = System.getenv("EventHubConnection__fullyQualifiedNamespace") + val entraClientId = System.getenv("EventHubConnection__clientId") + val tokenCredential = DefaultAzureCredentialBuilder() + .managedIdentityClientId(entraClientId) + .build() val time = measureTime { - val ehHealthData = DependencyChecker().checkEventHub(evHubConnStr, evHubSendName) + val ehHealthData = DependencyChecker().checkEventHub(evHubNamespace, evHubSendName, tokenCredential) result.dependencyHealthChecks.add(ehHealthData) result.status = if (ehHealthData.status == "UP") "UP" else "DOWNGRADED" } diff --git a/local_libs/lib-dex-commons/build.gradle.kts b/local_libs/lib-dex-commons/build.gradle.kts index f4824197d..d4fcc751c 100644 --- a/local_libs/lib-dex-commons/build.gradle.kts +++ b/local_libs/lib-dex-commons/build.gradle.kts @@ -9,7 +9,9 @@ plugins { } group = "gov.cdc.dex" -version = "0.0.48-SNAPSHOT" + +version = "0.0.47-SNAPSHOT" + repositories { maven { @@ -27,6 +29,8 @@ dependencies { implementation("com.azure:azure-messaging-eventhubs") implementation("com.azure:azure-messaging-servicebus") implementation("com.azure:azure-storage-blob") + implementation("com.azure:azure-identity") + implementation("com.azure:azure-identity-broker") implementation("redis.clients:jedis:5.1.0") implementation("com.azure:azure-cosmos:4.55.1") diff --git a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/AzureBlobProxy.kt b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/AzureBlobProxy.kt index 5874bd99f..6401cf451 100644 --- a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/AzureBlobProxy.kt +++ b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/AzureBlobProxy.kt @@ -1,15 +1,18 @@ package gov.cdc.dex.azure + +import com.azure.core.credential.TokenCredential import com.azure.storage.blob.BlobClient import com.azure.storage.blob.BlobContainerClient import com.azure.storage.blob.BlobServiceClientBuilder -class AzureBlobProxy(connectionStr: String, container: String) { +class AzureBlobProxy(blobStorageUrl: String, container: String, tokenCredential: TokenCredential) { private val blobContainerClient: BlobContainerClient init { blobContainerClient = BlobServiceClientBuilder() - .connectionString(connectionStr) + .endpoint(blobStorageUrl) + .credential(tokenCredential) .buildClient() .getBlobContainerClient(container) } diff --git a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/DedicatedEventHubSender.kt b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/DedicatedEventHubSender.kt index 434b993ec..2b5690881 100644 --- a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/DedicatedEventHubSender.kt +++ b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/DedicatedEventHubSender.kt @@ -1,14 +1,15 @@ package gov.cdc.dex.azure import com.azure.core.amqp.exception.AmqpException +import com.azure.core.credential.TokenCredential import com.azure.messaging.eventhubs.EventData import com.azure.messaging.eventhubs.EventHubClientBuilder import com.azure.messaging.eventhubs.EventHubProducerClient -class DedicatedEventHubSender( evHubConnStr: String, evHubTopicName: String) { +class DedicatedEventHubSender( evHubNamespace: String, evHubTopicName: String, tokenCredential: TokenCredential) { private val producer: EventHubProducerClient = EventHubClientBuilder() - .connectionString(evHubConnStr, evHubTopicName) + .credential(evHubNamespace, evHubTopicName, tokenCredential) .buildProducerClient() fun disconnect() { diff --git a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/ServiceBusProxy.kt b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/ServiceBusProxy.kt index 35848f6ab..0d84f4398 100644 --- a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/ServiceBusProxy.kt +++ b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/ServiceBusProxy.kt @@ -1,11 +1,12 @@ package gov.cdc.dex.azure +import com.azure.core.credential.TokenCredential import com.azure.messaging.servicebus.ServiceBusClientBuilder import com.azure.messaging.servicebus.ServiceBusSenderClient -class ServiceBusProxy(val sbConnString: String, val sbQueue: String) { +class ServiceBusProxy(serviceBusNamespace: String, sbQueue: String, tokenCredential: TokenCredential) { private val serviceBusSender: ServiceBusSenderClient = ServiceBusClientBuilder() - .connectionString(sbConnString) + .credential(serviceBusNamespace, tokenCredential) .sender() .queueName(sbQueue) .buildClient() diff --git a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/health/DependencyChecker.kt b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/health/DependencyChecker.kt index 0282f1a77..68a23b7ea 100644 --- a/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/health/DependencyChecker.kt +++ b/local_libs/lib-dex-commons/src/main/kotlin/gov/cdc/dex/azure/health/DependencyChecker.kt @@ -2,6 +2,7 @@ package gov.cdc.dex.azure.health import com.azure.core.amqp.AmqpRetryMode import com.azure.core.amqp.AmqpRetryOptions +import com.azure.core.credential.TokenCredential import com.azure.cosmos.CosmosClient import com.azure.cosmos.CosmosClientBuilder import com.azure.messaging.eventhubs.EventHubClientBuilder @@ -52,6 +53,22 @@ class DependencyChecker { } } + fun checkEventHub(eventHubNamespace: String, eventHubName: String, tokenCredential: TokenCredential) : DependencyHealthData { + val retryOptions = AmqpRetryOptions() + .setMaxRetries(1) + .setMode(AmqpRetryMode.FIXED) + .setTryTimeout(Duration.ofSeconds(10)) + + return checkDependency(AzureDependency.EVENT_HUB) { + val client: EventHubProducerClient = EventHubClientBuilder() + .credential(eventHubNamespace, eventHubName, tokenCredential) + .retryOptions(retryOptions) + .buildProducerClient() + client.partitionIds.count() + client.close() + } + } + fun checkServiceBus(connectionString: String, queueName: String) : DependencyHealthData { return checkDependency(AzureDependency.SERVICE_BUS) { val serviceBusClient: ServiceBusReceiverClient = ServiceBusClientBuilder() diff --git a/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestAzureBlobProxy.kt b/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestAzureBlobProxy.kt new file mode 100644 index 000000000..0f675a933 --- /dev/null +++ b/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestAzureBlobProxy.kt @@ -0,0 +1,22 @@ +package gov.cdc.dex + +import com.azure.identity.DefaultAzureCredentialBuilder +import gov.cdc.dex.azure.AzureBlobProxy +import org.junit.jupiter.api.Test + + +class TestAzureBlobProxy { + @Test + fun testBlobProxyCredential() { + val url = "https://ocioedemessagesadev.core.windows.net" + val container = "hl7ingress" + val clientId = System.getenv("BlobStorageClientId") + val credential = DefaultAzureCredentialBuilder() + .managedIdentityClientId(clientId) + .build() + + val blobProxy = AzureBlobProxy(url, container, credential) + blobProxy.getBlobClient("unitTests/BatchedMessage.doNotDelete.txt") + } + +} \ No newline at end of file diff --git a/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestDedicatedEHSender.kt b/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestDedicatedEHSender.kt new file mode 100644 index 000000000..4322a05a8 --- /dev/null +++ b/local_libs/lib-dex-commons/src/test/kotlin/gov/cdc/dex/TestDedicatedEHSender.kt @@ -0,0 +1,23 @@ +package gov.cdc.dex +import com.azure.identity.DefaultAzureCredentialBuilder +import gov.cdc.dex.azure.DedicatedEventHubSender +import org.junit.jupiter.api.Test + + +class TestDedicatedEHSender { + @Test + fun testDedicatedSenderCredential() { + val ehNamespace = "ocio-ede-dev-eventhub-namespace.servicebus.windows.net" + val ehHubName = "hl7-recdeb-reports" + val clientId = System.getenv("EventHubClientId") + + val token = DefaultAzureCredentialBuilder() + .managedIdentityClientId(clientId) + .build() + + val sender = DedicatedEventHubSender(ehNamespace, ehHubName, token) + val ids = sender.getPartitionIds() + ids.forEach { println(it) } + sender.disconnect() + } +} \ No newline at end of file diff --git a/local_libs/lib-dex-commons/src/test/kotlin/test/gov/cdc/dex/MetadataTest.kt b/local_libs/lib-dex-commons/src/test/kotlin/test/gov/cdc/dex/MetadataTest.kt index 73e5d853b..a55e8aeff 100644 --- a/local_libs/lib-dex-commons/src/test/kotlin/test/gov/cdc/dex/MetadataTest.kt +++ b/local_libs/lib-dex-commons/src/test/kotlin/test/gov/cdc/dex/MetadataTest.kt @@ -55,11 +55,11 @@ class MetadataTest { // val provenance = DEXProvenance("senderID", "system", "filePath", "fileTS", 10, null) // val fileMD = DEXFileMetadata(reportingJurisdiction = "13", uploadID = "", provenance = provenance, dataStream = dataStream, tracing = tracing) - val routingInfo = RoutingMetadata("filePPath", "fileTS", 10, "orgA", "uploadID", "dsid", "dsroute", "trace", "span") + val routingInfo = RoutingMetadata("filePPath", "fileTS", "10", "orgA", "orgB","uploadID", "dsid", "dsroute", "sender1", "testfile.txt") val stageMD = MockEventGridStageMetadata("stgName", "stgVersion", "sttus", listOf(), "ts") - val messageMD = MessageMetadata("msgUUID","SINGLE", 1, "hash", "MSH|") + val messageMD = MessageMetadata("msgUUID","SINGLE", 1, "hash") val summary = SummaryInfo("All good!") - val fullMD = DexHL7Metadata(messageMetadata = messageMD, routingInfo = routingInfo, stage = stageMD, summary = summary) + val fullMD = DexHL7Metadata(messageMetadata = messageMD, routingMetadata = routingInfo, stage = stageMD, summary = summary, content="") val json = gson.toJson(fullMD) println(json)