Skip to content
This repository was archived by the owner on Oct 2, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fns-hl7-pipeline/fn-receiver-debatcher/deployZip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ cd ../../..
echo "Deploying Zip..."

az functionapp deployment source config-zip -g $hl7RG -n $function --src $base_name
### Set FN_VERSION:

fn_version=$(cat pom.xml |grep -oPm1 "(?<=<version>)[^<]+")
az functionapp config appsettings set --name $function --resource-group $hl7RG --settings FN_VERSION=$fn_version > /dev/null
21 changes: 12 additions & 9 deletions fns-hl7-pipeline/fn-receiver-debatcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
<gson.version>2.10.1</gson.version>
<hl7pet.version>1.2.7.1</hl7pet.version>
<lib-dex-commons.version>0.0.48-SNAPSHOT</lib-dex-commons.version>
<lib-dex-commons.version>0.0.47-SNAPSHOT</lib-dex-commons.version>
<junitjupiter.version>5.9.2</junitjupiter.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -82,26 +82,25 @@
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.azure/azure-storage-blob -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junitjupiter.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
Expand Down Expand Up @@ -144,6 +143,10 @@
<name>FN_VERSION</name>
<value>${project.version}</value>
</property>
<property>
<name>WEBSITE_RUN_FROM_PACKAGE</name>
<value>1</value>
</property>
</appSettings>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Function {
@QueueTrigger(
name = "message",
queueName = "%queueName%",
connection = "BlobIngestConnectionString"
connection = "QueueConnection"
) message: String?,
context: ExecutionContext
): DexHL7Metadata? {
Expand All @@ -65,7 +65,7 @@ class Function {
)
)
// create blob client
val blobName = event.eventData.url.substringAfter("/${fnConfig.blobIngestContName}/")
val blobName = event.eventData.url.substringAfter("/${fnConfig.blobIngestContainerName}/")
logger.info("DEX::Reading blob: $blobName")
val blobClient = fnConfig.azBlobProxy.getBlobClient(blobName)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cdc.dex.hl7

import com.azure.identity.DefaultAzureCredentialBuilder
import gov.cdc.dex.azure.AzureBlobProxy
import gov.cdc.dex.azure.DedicatedEventHubSender
import org.slf4j.Logger
Expand All @@ -9,43 +10,54 @@ class FunctionConfig {
val azBlobProxy: AzureBlobProxy
val evHubSenderOut: DedicatedEventHubSender
val evHubSenderReports: DedicatedEventHubSender
private val logger : Logger = LoggerFactory.getLogger(this.javaClass.simpleName)
val blobIngestContName: String = try {
private val logger: Logger = LoggerFactory.getLogger(this.javaClass.simpleName)

val blobIngestContainerName : String = try {
System.getenv("BlobIngestContainerName")
} catch (e: NullPointerException) {
logger.error("FATAL: Missing environment variable BlobIngestContainerName")
throw e
}
val evHubSendName: String = try {
System.getenv("EventHubSendName")
} catch(e: NullPointerException) {
} catch (e: NullPointerException) {
logger.error("FATAL: Missing environment variable EventHubSendName")
throw e
}
val evReportsHubName: String = try {
System.getenv("EventReportsHubName")
} catch (e: NullPointerException) {
logger.error ("FATAL: Missing environment variable EventReportsHubName")
logger.error("FATAL: Missing environment variable EventReportsHubName")
throw e
}

init {
//Init Azure Storage connection
val ingestBlobConnStr = try {
System.getenv("BlobIngestConnectionString")
val identityClientId : String = try {
System.getenv("QueueConnection__clientId")
} catch (e: NullPointerException) {
logger.error("FATAL: Missing environment variable QueueConnection__clientId")
throw e
}
// set up the application to use a Managed Identity
val tokenCredential = DefaultAzureCredentialBuilder()
.managedIdentityClientId(identityClientId)
.build()

//Init Azure Storage connection
val ingestBlobUrl = try {
System.getenv("BlobIngestUrl")
} catch (e: NullPointerException) {
logger.error("FATAL: Missing environment variable BlobIngestConnectionString")
logger.error("FATAL: Missing environment variable BlobIngestUrl")
throw e
}
azBlobProxy = AzureBlobProxy(ingestBlobConnStr, blobIngestContName)
val evHubConnStr = try {
System.getenv("EventHubConnectionString")
azBlobProxy = AzureBlobProxy(ingestBlobUrl, blobIngestContainerName, tokenCredential)
val evHubNamespace = try {
System.getenv("EventHubNamespace")
} catch (e: NullPointerException) {
logger.error("FATAL: Missing environment variable EventHubConnectionString")
logger.error("FATAL: Missing environment variable EventHubNamespace")
throw e
}
evHubSenderOut = DedicatedEventHubSender(evHubConnStr, evHubSendName)
evHubSenderReports = DedicatedEventHubSender(evHubConnStr, evReportsHubName)
evHubSenderOut = DedicatedEventHubSender(evHubNamespace, evHubSendName, tokenCredential)
evHubSenderReports = DedicatedEventHubSender(evHubNamespace, evReportsHubName, tokenCredential)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class DebatcherTest {
try {
// initialize event report metadata
val eventMetadata = ReceiverEventMetadata(stage =
ReceiverEventStageMetadata(startProcessingTime = startTime,
eventTimestamp = Date().toIsoString()))
ReceiverEventStageMetadata(startProcessingTime = startTime,
eventTimestamp = Date().toIsoString()))
val testFileIS = this::class.java.getResource(filePath).openStream()
// Add routing data to Report object for this file
val eventReport = ReceiverEventReport()
Expand Down
2 changes: 1 addition & 1 deletion fns-hl7-pipeline/fn-redactor/deployZip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ cd ../../..
echo "Deploying Zip..."

az functionapp deployment source config-zip -g $hl7RG -n $function --src $base_name
### Set FN_VERSION:

fn_version=$(cat pom.xml |grep -oPm1 "(?<=<version>)[^<]+")
az functionapp config appsettings set --name $function --resource-group $hl7RG --settings FN_VERSION=$fn_version > /dev/null
12 changes: 10 additions & 2 deletions fns-hl7-pipeline/fn-redactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>gov.cdc.dataexchange</groupId>
<artifactId>redactor</artifactId>
<version>0.0.43-SNAPSHOT</version>
<version>0.0.44-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Azure Java Functions</name>

Expand All @@ -27,7 +27,7 @@
<slf4j.version>2.0.5</slf4j.version>
<hl7pet.version>1.2.9</hl7pet.version>
<kotlin.version>1.9.0</kotlin.version>
<lib-dex-commons.version>0.0.45-SNAPSHOT</lib-dex-commons.version>
<lib-dex-commons.version>0.0.47-SNAPSHOT</lib-dex-commons.version>
</properties>
<dependencyManagement>
<dependencies>
Expand All @@ -41,6 +41,14 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity-broker</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Function {
@EventHubTrigger(
name = "msg",
eventHubName = "%EventHubReceiveName%",
connection = "EventHubConnectionString",
connection = "EventHubConnection",
consumerGroup = "%EventHubConsumerGroup%",
)
message: List<String?>,
Expand Down Expand Up @@ -121,7 +121,6 @@ class Function {
} catch (e: Exception) {
logger.error("Unable to send to event hub ${fnConfig.evHubSendName}: ${e.message}")
}

return inputEvent
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cdc.dex.hl7

import com.azure.identity.DefaultAzureCredentialBuilder
import gov.cdc.dex.azure.DedicatedEventHubSender
import gov.cdc.dex.util.JsonHelper
import gov.cdc.dex.util.ProfileConfiguration
Expand All @@ -11,11 +12,16 @@ class FunctionConfig {
val evHubSendName: String = System.getenv("EventHubSendName")

init {
//Init Event Hub connections
val evHubConnStr = System.getenv("EventHubConnectionString")
evHubSender = DedicatedEventHubSender(evHubConnStr, evHubSendName)
//Init Event Hub connection
val entraClientId : String = System.getenv("EventHubConnection__clientId")
val evHubNamespace: String = System.getenv("EventHubConnection__fullyQualifiedNamespace")
val tokenCredential = DefaultAzureCredentialBuilder()
.managedIdentityClientId(entraClientId)
.build()
evHubSender = DedicatedEventHubSender(evHubNamespace, evHubSendName, tokenCredential)

val profileConfigJson = FunctionConfig::class.java.getResource("/$PROFILE_CONFIG_FILE_PATH")?.readText()
profileConfig = JsonHelper.gson.fromJson(profileConfigJson, ProfileConfiguration::class.java)
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package gov.cdc.dex.hl7

import com.azure.identity.DefaultAzureCredentialBuilder
import com.microsoft.azure.functions.*
import com.microsoft.azure.functions.annotation.*
import com.google.gson.JsonObject
import gov.cdc.dex.util.JsonHelper.toJsonElement
import gov.cdc.dex.azure.health.*
import java.time.LocalDate

import java.util.*
import kotlin.time.measureTime

Expand All @@ -22,9 +19,13 @@ class HealthCheckFunction {
): HttpResponseMessage {
val result = HealthCheckResult()
val evHubSendName: String = System.getenv("EventHubSendName")
val evHubConnStr = System.getenv("EventHubConnectionString")
val evHubNamespace = System.getenv("EventHubConnection__fullyQualifiedNamespace")
val entraClientId = System.getenv("EventHubConnection__clientId")
val tokenCredential = DefaultAzureCredentialBuilder()
.managedIdentityClientId(entraClientId)
.build()
val time = measureTime {
val ehHealthData = DependencyChecker().checkEventHub(evHubConnStr, evHubSendName)
val ehHealthData = DependencyChecker().checkEventHub(evHubNamespace, evHubSendName, tokenCredential)
result.dependencyHealthChecks.add(ehHealthData)
result.status = if (ehHealthData.status == "UP") "UP" else "DOWNGRADED"
}
Expand Down
6 changes: 5 additions & 1 deletion local_libs/lib-dex-commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ plugins {
}

group = "gov.cdc.dex"
version = "0.0.48-SNAPSHOT"

version = "0.0.47-SNAPSHOT"


repositories {
maven {
Expand All @@ -27,6 +29,8 @@ dependencies {
implementation("com.azure:azure-messaging-eventhubs")
implementation("com.azure:azure-messaging-servicebus")
implementation("com.azure:azure-storage-blob")
implementation("com.azure:azure-identity")
implementation("com.azure:azure-identity-broker")

implementation("redis.clients:jedis:5.1.0")
implementation("com.azure:azure-cosmos:4.55.1")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package gov.cdc.dex.azure


import com.azure.core.credential.TokenCredential
import com.azure.storage.blob.BlobClient
import com.azure.storage.blob.BlobContainerClient
import com.azure.storage.blob.BlobServiceClientBuilder

class AzureBlobProxy(connectionStr: String, container: String) {
class AzureBlobProxy(blobStorageUrl: String, container: String, tokenCredential: TokenCredential) {
private val blobContainerClient: BlobContainerClient

init {
blobContainerClient = BlobServiceClientBuilder()
.connectionString(connectionStr)
.endpoint(blobStorageUrl)
.credential(tokenCredential)
.buildClient()
.getBlobContainerClient(container)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package gov.cdc.dex.azure

import com.azure.core.amqp.exception.AmqpException
import com.azure.core.credential.TokenCredential
import com.azure.messaging.eventhubs.EventData
import com.azure.messaging.eventhubs.EventHubClientBuilder
import com.azure.messaging.eventhubs.EventHubProducerClient


class DedicatedEventHubSender( evHubConnStr: String, evHubTopicName: String) {
class DedicatedEventHubSender( evHubNamespace: String, evHubTopicName: String, tokenCredential: TokenCredential) {
private val producer: EventHubProducerClient = EventHubClientBuilder()
.connectionString(evHubConnStr, evHubTopicName)
.credential(evHubNamespace, evHubTopicName, tokenCredential)
.buildProducerClient()

fun disconnect() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package gov.cdc.dex.azure

import com.azure.core.credential.TokenCredential
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.ServiceBusSenderClient

class ServiceBusProxy(val sbConnString: String, val sbQueue: String) {
class ServiceBusProxy(serviceBusNamespace: String, sbQueue: String, tokenCredential: TokenCredential) {
private val serviceBusSender: ServiceBusSenderClient = ServiceBusClientBuilder()
.connectionString(sbConnString)
.credential(serviceBusNamespace, tokenCredential)
.sender()
.queueName(sbQueue)
.buildClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.dex.azure.health

import com.azure.core.amqp.AmqpRetryMode
import com.azure.core.amqp.AmqpRetryOptions
import com.azure.core.credential.TokenCredential
import com.azure.cosmos.CosmosClient
import com.azure.cosmos.CosmosClientBuilder
import com.azure.messaging.eventhubs.EventHubClientBuilder
Expand Down Expand Up @@ -52,6 +53,22 @@ class DependencyChecker {
}
}

fun checkEventHub(eventHubNamespace: String, eventHubName: String, tokenCredential: TokenCredential) : DependencyHealthData {
val retryOptions = AmqpRetryOptions()
.setMaxRetries(1)
.setMode(AmqpRetryMode.FIXED)
.setTryTimeout(Duration.ofSeconds(10))

return checkDependency(AzureDependency.EVENT_HUB) {
val client: EventHubProducerClient = EventHubClientBuilder()
.credential(eventHubNamespace, eventHubName, tokenCredential)
.retryOptions(retryOptions)
.buildProducerClient()
client.partitionIds.count()
client.close()
}
}

fun checkServiceBus(connectionString: String, queueName: String) : DependencyHealthData {
return checkDependency(AzureDependency.SERVICE_BUS) {
val serviceBusClient: ServiceBusReceiverClient = ServiceBusClientBuilder()
Expand Down
Loading