Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #65 from atlanhq/DX-266
Browse files Browse the repository at this point in the history
Rename CSV headings, add update-only semantic, include all tags in exports
  • Loading branch information
cmgrote authored Nov 6, 2023
2 parents 017300c + 5b6671e commit 85980f0
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 28 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/atlan-kotlin-sample.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repositories {
}

dependencies {
implementation("com.atlan:atlan-java:1.5.0")
implementation("com.atlan:atlan-java:1.6.0")
implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
runtimeOnly("org.apache.logging.log4j:log4j-core:2.20.0")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0")
Expand Down
2 changes: 1 addition & 1 deletion containers/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 Atlan Pte. Ltd.
FROM ghcr.io/atlanhq/atlan-java:1.5.0
FROM ghcr.io/atlanhq/atlan-java:1.6.0

LABEL org.opencontainers.image.vendor="Atlan Pte. Ltd." \
org.opencontainers.image.source="https://github.com/atlanhq/atlan-kotlin-samples" \
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
kotlin.code.style=official
version=0.4.0-SNAPSHOT
version=0.4.1-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
* @param path location and filename of the CSV file to read
* @param fieldSeparator character to use to separate fields (for example ',' or ';')
*/
class CSVReader @JvmOverloads constructor(path: String, fieldSeparator: Char = ',') : Closeable {
class CSVReader @JvmOverloads constructor(path: String, private val updateOnly: Boolean, fieldSeparator: Char = ',') : Closeable {

private val reader: CsvReader
private val header: List<String>
Expand Down Expand Up @@ -84,6 +84,7 @@ class CSVReader @JvmOverloads constructor(path: String, fieldSeparator: Char = '
true,
AssetBatch.CustomMetadataHandling.MERGE,
true,
updateOnly,
)
relatedHolds[id] = ConcurrentHashMap()
deferDeletes[id] = ConcurrentHashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class Importer(private val config: Map<String, String>) : AssetGenerator {

private val batchSize = config.getOrDefault("BATCH_SIZE", "50").toInt()
private val filename = config.getOrDefault("UPLOADED_FILE", "")
private val updateOnly = config.getOrDefault("UPSERT_SEMANTIC", "update") == "update"
private val attrsToOverwrite = attributesToClear()

fun import() {
CSVReader(filename).use { csv ->
CSVReader(filename, updateOnly).use { csv ->
val start = System.currentTimeMillis()
csv.streamRows(this, batchSize, logger)
logger.info("Total time taken: {} ms", System.currentTimeMillis() - start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ object MigrationAssistantImportPkg : CustomPackage(
multiSelect = true,
grid = 8,
),
"upsert_semantic" to Radio(
label = "Input handling",
required = true,
possibleValues = mapOf(
"upsert" to "Create and update",
"update" to "Update only",
),
default = "update",
help = "Whether to allow the creation of new assets from the input CSV, or ensure assets are only updated if they already exist in Atlan.",
),
"control_config_strategy" to Radio(
label = "Options",
required = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object CreateCMWriteConfig {
val overallScore = if (config.assetTypes != null) {
initialScore.toBuilder().options(
initialScore.options.toBuilder()
.customApplicableEntityTypes(config.assetTypes.toSet())
.applicableAssetTypes(config.assetTypes.toSet())
.build(),
)
.build()
Expand Down
19 changes: 15 additions & 4 deletions serde/src/main/kotlin/RowSerde.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,23 @@ const val CM_HEADING_DELIMITER = "::"
* @return the name of the header to use for that field
*/
fun getHeaderForField(field: AtlanField): String {
return getHeaderForField(field, Asset::class.java)
}

/**
* Retrieve the name to use for the header for a particular field, assuming a particular type of asset.
*
* @param field for which to determine the header name
* @param assetClass asset class in which to assume the field is defined
* @return the name of the header to use for that field
*/
fun getHeaderForField(field: AtlanField, assetClass: Class<*>): String {
return if (field is CustomMetadataField) {
// For custom metadata, translate the header to human-readable names
field.setName + CM_HEADING_DELIMITER + field.attributeName
} else {
field.atlanFieldName
// Use renamed fields for deserialization, if available
ReflectionCache.getDeserializedName(assetClass, field.atlanFieldName)
}
}

Expand Down Expand Up @@ -117,8 +129,7 @@ class RowDeserializer(private val heading: List<String>, private val row: List<S
customMetadataMap[setName]!!.attribute(attrName, value)
} else {
// "Normal" field...
val deserializedFieldName = ReflectionCache.getDeserializedName(assetClass, fieldName)
val setter = ReflectionCache.getSetter(builder.javaClass, deserializedFieldName)
val setter = ReflectionCache.getSetter(builder.javaClass, fieldName)
if (setter != null) {
val value = FieldSerde.getValueFromCell(rValue, setter)
if (value != null) {
Expand All @@ -128,7 +139,7 @@ class RowDeserializer(private val heading: List<String>, private val row: List<S
// Only set the value on the asset directly if it does not require
// special handling, otherwise leave it to the special handling
// to set the value (later)
ReflectionCache.setValue(builder, deserializedFieldName, value)
ReflectionCache.setValue(builder, fieldName, value)
}
}
}
Expand Down
48 changes: 30 additions & 18 deletions serde/src/main/kotlin/xformers/cell/AtlanTagXformer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,56 @@ import com.atlan.model.core.AtlanTag.AtlanTagBuilder
object AtlanTagXformer {

private const val SETTINGS_DELIMITER = ">>"
private const val PROPAGATED_DELIMITER = "<<"

enum class PropagationType {
FULL,
NONE,
HIERARCHY_ONLY,
PROPAGATED,
}

fun encode(fromGuid: String, atlanTag: AtlanTag): String {
val direct = fromGuid == atlanTag.entityGuid
return if (direct) {
listOf(
atlanTag.typeName,
encodePropagation(atlanTag),
).joinToString(SETTINGS_DELIMITER)
return when (val propagation = encodePropagation(atlanTag)) {
PropagationType.FULL -> "${atlanTag.typeName}$SETTINGS_DELIMITER${propagation.name}"
PropagationType.HIERARCHY_ONLY -> "${atlanTag.typeName}$SETTINGS_DELIMITER${propagation.name}"
else -> atlanTag.typeName
}
} else {
""
"${atlanTag.typeName}$PROPAGATED_DELIMITER${PropagationType.PROPAGATED.name}"
}
}

fun decode(atlanTag: String): AtlanTag {
val tokens = atlanTag.split(SETTINGS_DELIMITER)
val builder = AtlanTag.builder()
.typeName(tokens[0])
return decodePropagation(tokens, builder)
fun decode(atlanTag: String): AtlanTag? {
return if (!atlanTag.endsWith("$PROPAGATED_DELIMITER${PropagationType.PROPAGATED.name}")) {
val tokens = atlanTag.split(SETTINGS_DELIMITER)
val builder = AtlanTag.builder()
.typeName(tokens[0])
decodePropagation(tokens, builder)
} else {
null
}
}

private fun encodePropagation(atlanTag: AtlanTag): String {
private fun encodePropagation(atlanTag: AtlanTag): PropagationType {
return if (atlanTag.propagate) {
return when {
atlanTag.removePropagationsOnEntityDelete && !atlanTag.restrictPropagationThroughLineage -> "FULL"
atlanTag.removePropagationsOnEntityDelete && atlanTag.restrictPropagationThroughLineage -> "HIERARCHY_ONLY"
else -> ""
atlanTag.removePropagationsOnEntityDelete && !atlanTag.restrictPropagationThroughLineage -> PropagationType.FULL
atlanTag.removePropagationsOnEntityDelete && atlanTag.restrictPropagationThroughLineage -> PropagationType.HIERARCHY_ONLY
else -> PropagationType.NONE
}
} else {
// Nothing to propagate, so leave out all options
""
PropagationType.NONE
}
}

private fun decodePropagation(atlanTagTokens: List<String>, builder: AtlanTagBuilder<*, *>): AtlanTag {
if (atlanTagTokens.size > 1) {
when (atlanTagTokens[1].uppercase()) {
"FULL" -> builder.propagate(true).removePropagationsOnEntityDelete(true).restrictPropagationThroughLineage(false)
"HIERARCHY_ONLY" -> builder.propagate(true).removePropagationsOnEntityDelete(true).restrictPropagationThroughLineage(true)
PropagationType.FULL.name -> builder.propagate(true).removePropagationsOnEntityDelete(true).restrictPropagationThroughLineage(false)
PropagationType.HIERARCHY_ONLY.name -> builder.propagate(true).removePropagationsOnEntityDelete(true).restrictPropagationThroughLineage(true)
else -> builder.propagate(false)
}
} else {
Expand Down

0 comments on commit 85980f0

Please sign in to comment.