diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index e8da4a6549..ac4aa45506 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -214,6 +214,28 @@ public final class Constants { public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess"; public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName"; + /*** + * DataModel + */ + + public static final String ATLAS_DM_ENTITY_TYPE = "DMEntity"; + + public static final String ATLAS_DM_ATTRIBUTE_TYPE = "DMAttribute"; + public static final String ATLAS_DM_DATA_MODEL = "DMDataModel"; + + public static final String ATLAS_DM_VERSION_TYPE = "DMVersion"; + + public static final String ATLAS_DM_ENTITY_ASSOCIATION_TYPE= "DMEntityAssociation"; + public static final String ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE= "DMAttributeAssociation"; + + public static final String ATLAS_DM_QUALIFIED_NAME_PREFIX = "dMQualifiedNamePrefix"; + public static final String ATLAS_DM_NAMESPACE = "dMDataModelNamespace"; + public static final String ATLAS_DM_EXPIRED_AT_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; + public static final String ATLAS_DM_EXPIRED_AT_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; + public static final String ATLAS_DM_SYSTEM_DATE = "dMDataModelSystemDate"; + public static final String ATLAS_DM_BUSINESS_DATE = "dMDataModelBusinessDate"; + + /** * The homeId field is used when saving into Atlas a copy of an object that is being imported from another * repository. The homeId will be set to a String that identifies the other repository. The specific format @@ -272,7 +294,6 @@ public final class Constants { public static final String INDEX_PREFIX = "janusgraph_"; public static final String VERTEX_INDEX_NAME = INDEX_PREFIX + VERTEX_INDEX; - public static final String EDGE_INDEX_NAME = INDEX_PREFIX + EDGE_INDEX; public static final String NAME = "name"; public static final String QUALIFIED_NAME = "qualifiedName"; diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 1ad37b2564..8bee2815be 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -291,8 +291,15 @@ public enum AtlasErrorCode { TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"), PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}"), - ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}"); - + ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}"), + DATA_MODEL_VERSION_NOT_EXIST(400, "ATLAS-400-00-115", "Model version {0} does not exist"), + DATA_ENTITY_NOT_EXIST(400, "ATLAS-400-00-116", "DataEntity {0} does not exist"), + DATA_MODEL_NOT_EXIST(400, "ATLAS-400-00-117", "DataModel {0} does not exist"), + QUALIFIED_NAME_PREFIX_NOT_EXIST(400, "ATLAS-400-00-118", "dMQualifiedNamePrefix is mandatory for DMEntity/DMAttribute"), + NO_TYPE_EXISTS_FOR_QUALIFIED_NAME_PREFIX (400,"ATLAS-400-00-119", "No DMEntity/DMAttribute exists for dMQualifiedNamePrefix : {0}"), + NAME_NAMESPACE_NOT_EXIST (400, "ATLAS-400-00-120", "name/namespace are mandatory for DMEntity/DMAttribute"), + QUALIFIED_NAME_PREFIX_OR_TYPE_NOT_FOUND(400, "ATLAS-400-00-121", "qualifiedName/entityType are mandatory"), + INVALID_ENTITY_TYPE(400, "ATLAS-400-00-122", "Invalid entity type"); private String errorCode; private String errorMessage; private Response.Status httpCode; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java index 040822dccc..669d7819ed 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java @@ -24,7 +24,9 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityResolver; @@ -40,9 +42,11 @@ import org.apache.atlas.type.TemplateToken; import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -103,8 +107,36 @@ public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException { validateLabels(entity.getLabels()); + type.validateValue(entity, entity.getTypeName(), messages); + + + // DMEntity and DMAttributeType are requested for update + // by dMQualifiedNamePrefix which is not a unique attribute + // This can return multiple entity/attribute that match this prefix vale + // we have to return latest entity/attribute + if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) || + entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)){ + + String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX); + if (qualifiedNamePrefix.isEmpty()){ + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + // AtlasVertex vertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entity.getTypeName(), qualifiedNamePrefix); + AtlasVertex vertex= discoveryContext.getResolvedEntityVertex(entity.getGuid()); + if (vertex == null) { + // no entity exists with this qualifiedName, set qualifiedName and let entity be created + entity.setAttribute(Constants.QUALIFIED_NAME, qualifiedNamePrefix + "_" + RequestContext.get().getRequestTime()); + return; + } + + // if guidFromVertex is found let entity be updated + entity.setGuid(AtlasGraphUtilsV2.getIdFromVertex(vertex)); + type.getNormalizedValue(entity); + return; + } + if (!messages.isEmpty()) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); } @@ -161,6 +193,8 @@ protected void discover() throws AtlasBaseException { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity"); } + validateAttributesForDataModel(entity); + processDynamicAttributes(entity); walkEntityGraph(entity); @@ -485,4 +519,15 @@ private void processDynamicAttributes(AtlasEntity entity) throws AtlasBaseExcept } } } + + private void validateAttributesForDataModel(AtlasEntity entity) throws AtlasBaseException { + if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) || + entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)) { + if (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == null || + entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == "") { + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + } + + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 89679fe856..8c0e12fa32 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -72,6 +72,9 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMAttributePreprocessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityAssociationPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor; @@ -1655,14 +1658,43 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase } List copyOfUpdated = new ArrayList<>(context.getUpdatedEntities()); - for (AtlasEntity entity: copyOfUpdated) { + for (AtlasEntity entity : copyOfUpdated) { entityType = context.getType(entity.getGuid()); preProcessors = getPreProcessor(entityType.getTypeName()); - for(PreProcessor processor : preProcessors){ - LOG.debug("Executing preprocessor {} for entity {}", processor.getClass().getName(), entity.getGuid()); + for (PreProcessor processor : preProcessors) { processor.processAttributes(entity, context, UPDATE); } } + + List copyOfAppendRelationshipAttributes = new ArrayList<>(context.getUpdatedEntitiesForAppendRelationshipAttribute()); + for (AtlasEntity entity : copyOfAppendRelationshipAttributes) { + entityType = context.getType(entity.getGuid()); + if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) || + entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE) + ){ + preProcessors = getPreProcessor(entityType.getTypeName()); + for (PreProcessor processor : preProcessors) { + processor.processAttributes(entity, context, UPDATE); + } + } + } + + List copyOfRemoveRelationshipAttributes = new ArrayList<>(context.getEntitiesUpdatedWithRemoveRelationshipAttribute()); + for (AtlasEntity entity : copyOfRemoveRelationshipAttributes) { + entityType = context.getType(entity.getGuid()); + if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) + || entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) || + entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE) + ){ + preProcessors = getPreProcessor(entityType.getTypeName()); + for (PreProcessor processor : preProcessors) { + processor.processAttributes(entity, context, UPDATE); + } + } + } } private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { @@ -2006,6 +2038,17 @@ public List getPreProcessor(String typeName) { case PROCESS_ENTITY_TYPE: preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this)); + case ATLAS_DM_ENTITY_TYPE: + preProcessors.add(new DMEntityPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ATTRIBUTE_TYPE: + preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ENTITY_ASSOCIATION_TYPE: + preProcessors.add(new DMEntityAssociationPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE: + preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); } // The default global pre-processor for all AssetTypes diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 42d30d39ca..4dbe3629c7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -51,19 +51,7 @@ import java.text.SimpleDateFormat; import java.util.*; -import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_ENTITY_TYPE; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY; -import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.GLOSSARY_TERMS_EDGE_LABEL; -import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; -import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; -import static org.apache.atlas.repository.Constants.NAME; -import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY; -import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC; @@ -708,6 +696,19 @@ public static Iterator findActiveEntityVerticesByType(AtlasGraph gr return query.vertices().iterator(); } + public static AtlasVertex findLatestEntityAttributeVerticesByType(String typename, String dMQualifiedNamePrefix) { + AtlasGraph graph= getGraphInstance(); + AtlasGraphQuery query = graph.query() + .has(ENTITY_TYPE_PROPERTY_KEY, typename) + .has(ATLAS_DM_QUALIFIED_NAME_PREFIX, dMQualifiedNamePrefix) + .has(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0) + .has(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + + Iterator results = query.vertices().iterator(); + AtlasVertex vertex = results.hasNext() ? results.next() : null; + return vertex; + } + public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException { return relationshipTypeHasInstanceEdges(getGraphInstance(), typeName); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java index 72c3200809..fe209140ff 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java @@ -30,8 +30,8 @@ public class EntityMutationContext { private final EntityGraphDiscoveryContext context; private final List entitiesCreated = new ArrayList<>(); private final List entitiesUpdated = new ArrayList<>(); - private final List entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>(); - private final List entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>(); + private List entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>(); + private List entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>(); private final Map entityVsType = new HashMap<>(); private final Map entityVsVertex = new HashMap<>(); private final Map guidAssignments = new HashMap<>(); @@ -59,11 +59,11 @@ public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType } } - public void setUpdatedWithRelationshipAttributes(AtlasEntity entity){ + public void setUpdatedWithRelationshipAttributes(AtlasEntity entity) { entitiesUpdatedWithAppendRelationshipAttribute.add(entity); } - public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity){ + public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity) { entitiesUpdatedWithRemoveRelationshipAttribute.add(entity); } @@ -84,6 +84,39 @@ public void addUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType } } + public void removeUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) { + if (entityVsVertex.containsKey(internalGuid)) { // if the entity was already created/updated + entitiesUpdated.remove(entity); + entityVsType.remove(entity.getGuid(), type); + entityVsVertex.remove(entity.getGuid(), atlasVertex); + +// if (!StringUtils.equals(internalGuid, entity.getGuid())) { +// guidAssignments.put(internalGuid, entity.getGuid()); +// entityVsVertex.put(internalGuid, atlasVertex); +// } + } + } + + public void removeUpdatedWithRelationshipAttributes(AtlasEntity entity) { + Iterator entities = entitiesUpdatedWithAppendRelationshipAttribute.iterator(); + while (entities.hasNext()) { + String guid = entities.next().getGuid(); + if (guid.equals(entity.getGuid())) { + entities.remove(); + } + } + } + + public void removeUpdatedWithDeleteRelationshipAttributes(AtlasEntity entity) { + Iterator entities = entitiesUpdatedWithRemoveRelationshipAttribute.iterator(); + while (entities.hasNext()) { + String guid = entities.next().getGuid(); + if (guid.equals(entity.getGuid())) { + entities.remove(); + } + } + } + public void addEntityToRestore(AtlasVertex vertex) { if (entitiesToRestore == null) { entitiesToRestore = new ArrayList<>(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java index d6a2d717d9..788be55ce7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java @@ -22,6 +22,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; @@ -69,7 +70,33 @@ public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryC throw new AtlasBaseException(element.getValue(), AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); } - vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, entity.getAttributes()); + // ------- + + if ( + ((entity.getAttributes().get(Constants.QUALIFIED_NAME) == null) && (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX)!=null)) + && + ((entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE)) || (entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)))) { + + String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX); + if (qualifiedNamePrefix.isEmpty()){ + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + vertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entity.getTypeName(), qualifiedNamePrefix); + + if (vertex == null) { + // no entity exists with this qualifiedName, set qualifiedName and let entity be created + entity.setAttribute(Constants.QUALIFIED_NAME, qualifiedNamePrefix + "_" + RequestContext.get().getRequestTime()); + return context; + } + + // if guidFromVertex is found let entity be updated + // entity.setGuid(AtlasGraphUtilsV2.getIdFromVertex(vertex)); + // else find qualifiedName and set qualifiedName : as it is mandatory + context.addResolvedGuid(guid, vertex); + }else { + vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, entity.getAttributes()); + } + } else if (!isAssignedGuid) { // for local-guids, entity must be in the stream throw new AtlasBaseException(element.getValue(), AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java new file mode 100644 index 0000000000..2b39630150 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java @@ -0,0 +1,772 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasRelationship; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public abstract class AbstractModelPreProcessor implements PreProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractModelPreProcessor.class); + private static final String ATTRIBUTE_TYPE = "DMAttribute"; + + protected final AtlasTypeRegistry typeRegistry; + + protected final EntityGraphRetriever entityRetriever; + + protected EntityGraphMapper entityGraphMapper; + protected AtlasRelationshipStore atlasRelationshipStore; + + + public AbstractModelPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + this.typeRegistry = typeRegistry; + this.entityRetriever = entityRetriever; + this.entityGraphMapper = entityGraphMapper; + this.atlasRelationshipStore = atlasRelationshipStore; + } + + + protected void setModelDates(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_SYSTEM_DATE, value); + newEntity.setAttribute(ATLAS_DM_BUSINESS_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_SYSTEM_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_BUSINESS_DATE, value); + } + + protected void setModelExpiredAtDates(AtlasEntity oldEntity, AtlasVertex oldVertex, Object value) { + oldEntity.setAttribute(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, value); + oldEntity.setAttribute(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(oldVertex, ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(oldVertex, ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, value); + } + + protected void setQualifiedName(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(QUALIFIED_NAME, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, QUALIFIED_NAME, value); + } + + protected void setName(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(NAME, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, NAME, value); + } + + protected void setQualifiedNamePrefix(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_QUALIFIED_NAME_PREFIX, value); + } + + protected void setNamespace(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_NAMESPACE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_NAMESPACE, value); + } + + protected ModelResponse createEntity(String qualifiedName, String name, String entityType, String namespace, EntityMutationContext context) throws AtlasBaseException { + String guid = UUID.randomUUID().toString(); + AtlasEntity entity = new AtlasEntity(entityType); + entity.setAttribute(NAME, name); + entity.setAttribute(VERSION_PROPERTY_KEY, 0); + entity.setAttribute(QUALIFIED_NAME, qualifiedName); + entity.setAttribute(ATLAS_DM_NAMESPACE, namespace); + entity.setAttribute(ATLAS_DM_BUSINESS_DATE, RequestContext.get().getRequestTime()); + entity.setAttribute(ATLAS_DM_SYSTEM_DATE, RequestContext.get().getRequestTime()); + if (entityType.equals(ATLAS_DM_ENTITY_TYPE) || entityType.equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + String prefix = qualifiedName.substring(0, qualifiedName.indexOf("_")); + entity.setAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX, prefix); + } + AtlasVertex versionVertex = entityGraphMapper.createVertexWithGuid(entity, guid); + context.getDiscoveryContext().addResolvedGuid(guid, versionVertex); + entity.setGuid(guid); + return new ModelResponse(entity, versionVertex); + } + + public ModelResponse replicateModelVersion(String modelGuid, String modelQualifiedName, long now) throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo dataModel = entityRetriever.toAtlasEntityWithExtInfo(modelGuid, false); + List existingModelVersions = (List) dataModel.getEntity().getRelationshipAttributes().get("dMVersions"); + + String modelVersion = "v1"; + AtlasRelatedObjectId existingModelVersionObj = null; + + if (CollectionUtils.isEmpty(existingModelVersions)) { + return new ModelResponse(null, null); + } + + int existingVersionNumber = existingModelVersions.size(); + modelVersion = "v" + (++existingVersionNumber); + + // get active model version + for (AtlasRelatedObjectId modelVersionObj : existingModelVersions) { + AtlasEntity modelVersionEntity = entityRetriever.toAtlasEntity(modelVersionObj.getGuid()); + Date expiredAtBusinessDate = (Date) modelVersionEntity.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + Date expiredAtSystemDate = (Date) modelVersionEntity.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + existingModelVersionObj = modelVersionObj; + } + + if (existingModelVersionObj == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_VERSION_NOT_EXIST); + } + + AtlasEntity existingModelVersionEntity = entityRetriever.toAtlasEntityWithExtInfo(existingModelVersionObj.getGuid()).getEntity(); + AtlasVertex existingModelVersionVertex = entityRetriever.getEntityVertex(existingModelVersionObj.getGuid()); + + + AtlasVertex copyModelVertex = entityGraphMapper.createVertex(existingModelVersionEntity); + AtlasEntity copyModelVersion = entityRetriever.toAtlasEntity(copyModelVertex); + copyAllAttributes(existingModelVersionEntity, copyModelVersion, now); + setModelDates(copyModelVersion, copyModelVertex, now); + setQualifiedName(copyModelVersion, copyModelVertex, modelQualifiedName + "/" + modelVersion); + setName(copyModelVersion, copyModelVertex, modelVersion); + setNamespace(copyModelVersion, copyModelVertex, dataModel.getEntity().getAttribute(ATLAS_DM_NAMESPACE)); + setModelExpiredAtDates(existingModelVersionEntity, existingModelVersionVertex, now); + return new ModelResponse(existingModelVersionEntity, copyModelVersion, existingModelVersionVertex, copyModelVertex); + } + + public ModelResponse replicateModelEntity(AtlasEntity existingEntity, AtlasVertex existingEntityVertex, String entityQualifiedNamePrefix, long epoch) throws AtlasBaseException { + AtlasVertex copyEntityVertex = entityGraphMapper.createVertex(existingEntity); + AtlasEntity copyEntity = entityRetriever.toAtlasEntity(copyEntityVertex); + copyAllAttributes(existingEntity, copyEntity, epoch); + String entityQualifiedName = entityQualifiedNamePrefix + "_" + epoch; + setQualifiedName(copyEntity, copyEntityVertex, entityQualifiedName); + setModelDates(copyEntity, copyEntityVertex, epoch); + setName(copyEntity, copyEntityVertex, existingEntity.getAttribute(NAME)); + setNamespace(copyEntity, copyEntityVertex, existingEntity.getAttribute(ATLAS_DM_NAMESPACE)); + setQualifiedNamePrefix(copyEntity, copyEntityVertex, existingEntity.getAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX)); + setModelExpiredAtDates(existingEntity, existingEntityVertex, epoch); + return new ModelResponse(existingEntity, copyEntity, existingEntityVertex, copyEntityVertex); + } + + protected ModelResponse replicateModelAttribute(AtlasEntity existingAttribute, AtlasVertex existingAttributeVertex, String attributeQualifiedNamePrefix, long epoch) throws AtlasBaseException { + AtlasVertex copyAttributeVertex = entityGraphMapper.createVertex(existingAttribute); + AtlasEntity copyAttributeEntity = entityRetriever.toAtlasEntity(copyAttributeVertex); + copyAllAttributes(existingAttribute, copyAttributeEntity, epoch); + String attributeQualifiedName = attributeQualifiedNamePrefix + "_" + epoch; + setQualifiedName(copyAttributeEntity, copyAttributeVertex, attributeQualifiedName); + setModelDates(copyAttributeEntity, copyAttributeVertex, epoch); + setName(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(NAME)); + setNamespace(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(ATLAS_DM_NAMESPACE)); + setQualifiedNamePrefix(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX)); + setModelExpiredAtDates(existingAttribute, existingAttributeVertex, epoch); + return new ModelResponse(existingAttribute, copyAttributeEntity, existingAttributeVertex, copyAttributeVertex); + } + + public void createModelVersionModelEntityRelationship(AtlasVertex modelVersionVertex, + AtlasVertex modelEntityVertex) throws AtlasBaseException { + AtlasRelationship modelVersionEntityRelation = new AtlasRelationship("d_m_version_d_m_entities"); + modelVersionEntityRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionEntityRelation.setEnd1(new AtlasObjectId( + GraphHelper.getGuid(modelVersionVertex), + GraphHelper.getTypeName(modelVersionVertex))); + modelVersionEntityRelation.setEnd2(new AtlasObjectId( + GraphHelper.getGuid(modelEntityVertex), + GraphHelper.getTypeName(modelEntityVertex))); + atlasRelationshipStore.create(modelVersionEntityRelation); + } + + protected void createModelVersionModelEntityRelationship(AtlasVertex modelVersionVertex, + List existingEntities) throws AtlasBaseException { + if (CollectionUtils.isEmpty(existingEntities)) { + return; + } + AtlasRelationship modelVersionEntityRelation = new AtlasRelationship("d_m_version_d_m_entities"); + modelVersionEntityRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionEntityRelation.setEnd1(new AtlasObjectId( + GraphHelper.getGuid(modelVersionVertex), + GraphHelper.getTypeName(modelVersionVertex))); + for (AtlasRelatedObjectId existingEntity : existingEntities) { + AtlasEntity entity = entityRetriever.toAtlasEntity(existingEntity.getGuid()); + Date expiredAtBusinessDate = (Date) entity.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + Date expiredAtSystemDate = (Date) entity.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + modelVersionEntityRelation.setEnd2(new AtlasObjectId( + existingEntity.getGuid(), + existingEntity.getTypeName() + )); + atlasRelationshipStore.create(modelVersionEntityRelation); + } + } + + protected void createModelEntityModelAttributeRelation(AtlasVertex entity, List existingEntityAttributes) throws AtlasBaseException { + if (CollectionUtils.isEmpty(existingEntityAttributes)) { + return; + } + AtlasRelationship modelEntityAttributeRelation = new AtlasRelationship("d_m_entity_d_m_attributes"); + modelEntityAttributeRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelEntityAttributeRelation.setEnd1( + new AtlasObjectId( + GraphHelper.getGuid(entity), + GraphHelper.getTypeName(entity))); + for (AtlasRelatedObjectId existingEntityAttribute : existingEntityAttributes) { + AtlasEntity entityAttribute = entityRetriever.toAtlasEntity(existingEntityAttribute.getGuid()); + Date expiredAtBusinessDate = (Date) entityAttribute.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + Date expiredAtSystemDate = (Date) entityAttribute.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + modelEntityAttributeRelation.setEnd2( + new AtlasObjectId( + existingEntityAttribute.getGuid(), + existingEntityAttribute.getTypeName())); + atlasRelationshipStore.create(modelEntityAttributeRelation); + } + } + + protected void createModelEntityModelAttributeRelation(AtlasVertex entity, AtlasVertex attribute) throws AtlasBaseException { + AtlasRelationship modelEntityAttributeRelation = new AtlasRelationship("d_m_entity_d_m_attributes"); + modelEntityAttributeRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelEntityAttributeRelation.setEnd1( + new AtlasObjectId( + GraphHelper.getGuid(entity), + GraphHelper.getTypeName(entity))); + modelEntityAttributeRelation.setEnd2( + new AtlasObjectId( + GraphHelper.getGuid(attribute), + GraphHelper.getTypeName(attribute))); + atlasRelationshipStore.create(modelEntityAttributeRelation); + } + + + public void createModelModelVersionRelation(String modelGuid, String latestModelVersionGuid) throws AtlasBaseException { + AtlasRelationship modelVersionModelRelation = new AtlasRelationship("d_m_data_model_d_m_versions"); + modelVersionModelRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionModelRelation.setEnd1( + new AtlasObjectId( + modelGuid, ATLAS_DM_DATA_MODEL)); + modelVersionModelRelation.setEnd2( + new AtlasObjectId(latestModelVersionGuid, ATLAS_DM_VERSION_TYPE)); + atlasRelationshipStore.create(modelVersionModelRelation); + } + + protected void copyAllAttributes(AtlasEntity source, AtlasEntity destination, long epochNow) { + if (source == null || destination == null) { + throw new IllegalArgumentException("Source and destination entities must not be null."); + } + + if (source.getAttributes() != null) { + destination.setAttributes(new HashMap<>(source.getAttributes())); + } else { + destination.setAttributes(new HashMap<>()); + } + + + if (CollectionUtils.isNotEmpty(source.getMeanings())) { + destination.setMeanings(new ArrayList<>(source.getMeanings())); + } else { + destination.setMeanings(new ArrayList<>()); + } + + long requestTime = RequestContext.get().getRequestTime(); + destination.setCreateTime(new Date(requestTime)); + destination.setUpdateTime(new Date(requestTime)); + + + if (source.getCustomAttributes() != null) { + destination.setCustomAttributes(new HashMap<>(source.getCustomAttributes())); + } else { + destination.setCustomAttributes(new HashMap<>()); // Set empty map if source custom attributes are null + } + + if (CollectionUtils.isNotEmpty(source.getClassifications())) { + destination.setClassifications(new ArrayList<>(source.getClassifications())); + } else { + destination.setClassifications(new ArrayList<>()); // Set empty list if source classifications are null or empty + } + + String entityType = source.getTypeName(); + + if (MapUtils.isNotEmpty(source.getRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getRelationshipAttributes(), destination, entityType); + destination.setRelationshipAttributes(relationAttributes); + } + if (MapUtils.isNotEmpty(source.getAppendRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getAppendRelationshipAttributes(), destination, entityType); + destination.setAppendRelationshipAttributes(relationAttributes); + } + if (MapUtils.isNotEmpty(source.getRemoveRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getRemoveRelationshipAttributes(), destination, entityType); + destination.setRemoveRelationshipAttributes(relationAttributes); + } + + } + + private Map copyRelationshipAttributes(Map sourceAttributes, AtlasEntity destination, String entityType) { + Map destinationAttributes = new HashMap<>(); + + if (MapUtils.isEmpty(sourceAttributes)) { + return destinationAttributes; + } + + Set allowedRelations = allowedRelationshipsForEntityType(entityType); + + for (String attribute : sourceAttributes.keySet()) { + if (allowedRelations.contains(attribute)) { + destinationAttributes.put(attribute, sourceAttributes.get(attribute)); + } + } + + return destinationAttributes; + } + + public static void replaceAttributes(Map existingAttributes, Map diffAttributes) { + if (MapUtils.isEmpty(diffAttributes)) { + return; + } + // Temporary map to hold new key-value pairs during replacement + Map tempMap = new HashMap<>(); + + // Iterate over the original map + for (Map.Entry entry : existingAttributes.entrySet()) { + String originalKey = entry.getKey(); + Object value = entry.getValue(); + + // Check if the second map contains a key for replacement + if (diffAttributes.containsKey(originalKey)) { + Object newValue = diffAttributes.get(originalKey); // Get the new key from second map + tempMap.put(originalKey, newValue); // Put the new key in the temp map + } else { + tempMap.put(originalKey, value); // No replacement, keep the original key + } + } + + // Clear the original map and put all the updated entries + existingAttributes.clear(); + existingAttributes.putAll(tempMap); + } + + protected void applyDiffs(AtlasEntity sourceEntity, AtlasEntity destinationEntity, String typeName) { + RequestContext reqContext = RequestContext.get(); + AtlasEntity diffEntity = reqContext.getDifferentialEntity(sourceEntity.getGuid()); + if (diffEntity == null) { + return; + } + boolean diffExistsForSameType = diffEntity.getTypeName().equals(typeName); + if (!diffExistsForSameType) { + return; + } + replaceAttributes(destinationEntity.getAttributes(), diffEntity.getAttributes()); + } + + protected void unsetExpiredDates(AtlasEntity latestEntity, AtlasVertex latestVertex) { + latestEntity.setAttribute(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0); + latestEntity.setAttribute(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + AtlasGraphUtilsV2.setEncodedProperty(latestVertex, ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0); + AtlasGraphUtilsV2.setEncodedProperty(latestVertex, ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + } + + protected Set allowedRelationshipsForEntityType(String entityType) { + Set allowedRelationships = new HashSet<>(); + switch (entityType) { + case ATLAS_DM_ENTITY_TYPE: + allowedRelationships.add("dMMappedToEntities"); + allowedRelationships.add("dMMappedFromEntities"); + allowedRelationships.add("dMRelatedFromEntities"); + allowedRelationships.add("dMRelatedToEntities"); + break; + case ATLAS_DM_ATTRIBUTE_TYPE: + allowedRelationships.add("dMMappedFromAttributes"); + allowedRelationships.add("dMMappedToAttributes"); + allowedRelationships.add("dMRelatedFromAttributes"); + allowedRelationships.add("dMRelatedToAttributes"); + break; + case ATLAS_DM_ENTITY_ASSOCIATION_TYPE: + allowedRelationships.add("dMEntityTo"); + allowedRelationships.add("dMEntityFrom"); + break; + case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE: + allowedRelationships.add("dMAttributeTo"); + allowedRelationships.add("dMAttributeFrom"); + break; + } + return allowedRelationships; + } + + protected ModelResponse updateDMEntity(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return new ModelResponse(entity, vertex); + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + AtlasEntity.AtlasEntityWithExtInfo existingEntity = entityRetriever.toAtlasEntityWithExtInfo(vertex, false); + List existingEntityAttributes = (List) existingEntity.getEntity().getRelationshipAttributes().get("dMAttributes"); + + + // get model qualifiedName with qualifiedNamePrefix + String qualifiedNamePrefix = (String) entity.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + String modelVersion = "v1"; + + + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + // model is not replicated successfully + if (modelVersionResponse.getReplicaEntity() == null) { + String namespace = (String) entity.getAttributes().get(ATLAS_DM_NAMESPACE); + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + AtlasEntity existingVersion = modelVersionResponse.getExistingEntity(); + + // create entity e1 ---> e1' + ModelResponse modelEntityResponse = replicateModelEntity(existingEntity.getEntity(), vertex, qualifiedNamePrefix, now); + AtlasVertex copyEntityVertex = modelEntityResponse.getReplicaVertex(); + AtlasEntity copyEntity = modelEntityResponse.getReplicaEntity(); + applyDiffs(entity, copyEntity, ATLAS_DM_ENTITY_TYPE); + unsetExpiredDates(copyEntity, copyEntityVertex); + + // create model-modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // create modelVersion-modelEntity relationship with new entity + createModelVersionModelEntityRelationship(latestModelVersionVertex, copyEntityVertex); + + // create modelVersion-modelEntity relation with old entities which are not expired + if (existingVersion != null) { + List existingEntities = (List) existingVersion.getRelationshipAttributes().get("dMEntities"); + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + } + // create modelEntity-modelAttributeRelationship + createModelEntityModelAttributeRelation(copyEntityVertex, existingEntityAttributes); + + + /** + * update context + */ + // previousEntity and previousModelVersion + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + AtlasEntityType modelVersionType = typeRegistry.getEntityTypeByName(modelVersionResponse.getReplicaEntity().getTypeName()); + + context.addCreated(copyEntity.getGuid(), copyEntity, entityType, copyEntityVertex); + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, modelVersionType, latestModelVersionVertex); + + //remove existing entity from context so it is not updated + context.removeUpdated(entity.getGuid(), entity, + entityType, vertex); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + modelVertex); + + return new ModelResponse(copyEntity, copyEntityVertex); + } + + protected Map processRelationshipAttributesForEntity(AtlasEntity entity, Map relationshipAttributes, EntityMutationContext context) throws AtlasBaseException { + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + ; + ModelResponse modelResponseRelatedEntity = null; + String guid = ""; + Set allowedRelations = allowedRelationshipsForEntityType(entity.getTypeName()); + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + if (!allowedRelations.contains(attribute)) { + continue; + } + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMEntity( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + //relationAttribute.put("guid", modelResponseRelatedEntity.getCopyEntity()); + destMap = new HashMap<>(relationAttribute); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + destMap.put("guid", guid); + //destMap.put(QUALIFIED_NAME, ) + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMEntity( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + + Map destMap = new HashMap<>(attributeList); + destMap.put("guid", guid); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + appendAttributesDestination.put(attribute, destMap); + } + } + } + } + return appendAttributesDestination; + } + + protected ModelResponse replicateDMAssociation(AtlasEntity existingEntity, AtlasVertex existingEntityVertex, long epoch) throws AtlasBaseException { + AtlasVertex copyEntityVertex = entityGraphMapper.createVertex(existingEntity); + AtlasEntity copyEntity = entityRetriever.toAtlasEntity(copyEntityVertex); + copyAllAttributes(existingEntity, copyEntity, epoch); + setModelDates(copyEntity, copyEntityVertex, epoch); + setModelDates(copyEntity, copyEntityVertex, epoch); + setModelExpiredAtDates(existingEntity, existingEntityVertex, epoch); + return new ModelResponse(existingEntity, copyEntity, existingEntityVertex, copyEntityVertex); + } + + protected Map processRelationshipAttributesForAttribute(AtlasEntity entity, Map relationshipAttributes, EntityMutationContext context) throws AtlasBaseException { + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + ; + ModelResponse modelResponseRelatedEntity = null; + String guid = ""; + Set allowedRelations = allowedRelationshipsForEntityType(entity.getTypeName()); + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + if (!allowedRelations.contains(attribute)) { + continue; + } + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMAttribute( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + //relationAttribute.put("guid", modelResponseRelatedEntity.getCopyEntity()); + destMap = new HashMap<>(relationAttribute); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + destMap.put("guid", guid); + //destMap.put(QUALIFIED_NAME, ) + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMAttribute( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + + Map destMap = new HashMap<>(attributeList); + destMap.put("guid", guid); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + appendAttributesDestination.put(attribute, destMap); + } + } + } + } + return appendAttributesDestination; + } + + protected ModelResponse updateDMAttribute(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + if (!entityAttribute.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return new ModelResponse(entityAttribute, vertexAttribute); + } + + String attributeName = (String) entityAttribute.getAttribute(NAME); + + if (StringUtils.isEmpty(attributeName) || isNameInvalid(attributeName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME, attributeName); + } + + long now = RequestContext.get().getRequestTime(); + + + // get entity qualifiedName with qualifiedNamePrefix + String attributeQualifiedNamePrefix = (String) entityAttribute.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = attributeQualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = attributeQualifiedNamePrefix.substring(0, lastIndex); + String namespace = (String) entityAttribute.getAttributes().get(ATLAS_DM_NAMESPACE); + String modelVersion = "v1"; + + ModelResponse modelENtityResponse = null; + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + + // get model qualifiedName with qualifiedNamePrefix + lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + String modelGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), + attrValues); + + List existingAttributes = null; + + if (latestEntityVertex != null) { + modelENtityResponse = replicateModelEntity( + entityRetriever.toAtlasEntity(latestEntityVertex), + latestEntityVertex, + entityQualifiedNamePrefix, + now + ); + modelVersion = "v2"; + if (modelENtityResponse.getExistingEntity() != null && modelENtityResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingAttributes = (List) modelENtityResponse.getExistingEntity().getAttributes().get("dMAttributes"); + } + } else { + int lastSlashIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String entityName = entityQualifiedNamePrefix.substring(lastSlashIndex + 1); + modelENtityResponse = createEntity( + entityQualifiedNamePrefix + "_" + now, + entityName, + ATLAS_DM_ENTITY_TYPE, + namespace, + context + ); + } + + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + if (modelVersionResponse.getReplicaEntity() == null) { + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + List existingEntities = null; + + if (modelVersionResponse.getExistingEntity() != null && modelVersionResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingEntities = (List) modelVersionResponse.getExistingEntity().getRelationshipAttributes().get("dMEntities"); + } + + AtlasEntity existingEntityAttributeWithExtInfo = entityRetriever.toAtlasEntityWithExtInfo(entityAttribute.getGuid(), false).getEntity(); + + // create attribute a1 ---> a1' + ModelResponse modelAttributeResponse = replicateModelAttribute( + existingEntityAttributeWithExtInfo, + entityRetriever.getEntityVertex(entityAttribute.getGuid()), + attributeQualifiedNamePrefix, + now); + + AtlasVertex copyAttributeVertex = modelAttributeResponse.getReplicaVertex(); + AtlasEntity copyAttribute = modelAttributeResponse.getReplicaEntity(); + applyDiffs(entityAttribute, copyAttribute, ATLAS_DM_ATTRIBUTE_TYPE); + unsetExpiredDates(copyAttribute, copyAttributeVertex); + + // create model-modelVersion relationship + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // create modelVersion-entity relationship [with new entity] + createModelVersionModelEntityRelationship(latestModelVersionVertex, modelENtityResponse.getReplicaVertex()); + + // create modelVersion-entity relationship [with existing entities] + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + + + // create entity - attribute relation [with new attribute] + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), copyAttributeVertex); + + // create entity - attribute relation [with existing attributes] + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), existingAttributes); + + AtlasEntityType attributeType = typeRegistry.getEntityTypeByName(entityAttribute.getTypeName()); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(modelENtityResponse.getReplicaEntity().getTypeName()); + AtlasEntityType modelVersionType = typeRegistry.getEntityTypeByName(latestModelVersionEntity.getTypeName()); + + context.addCreated(copyAttribute.getGuid(), copyAttribute, attributeType, copyAttributeVertex); + context.addCreated(modelENtityResponse.getReplicaEntity().getGuid(), modelENtityResponse.getReplicaEntity(), + entityType, modelENtityResponse.getReplicaVertex()); + context.addCreated(latestModelVersionEntity.getGuid(), + latestModelVersionEntity, modelVersionType, latestModelVersionVertex); + + context.removeUpdated(entityAttribute.getGuid(), entityAttribute, + entityType, vertexAttribute); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + return new ModelResponse(copyAttribute, copyAttributeVertex); + } + +} + + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java new file mode 100644 index 0000000000..5430d98451 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java @@ -0,0 +1,102 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMAttributeAssociationPreprocessor extends AbstractModelPreProcessor{ + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMAttributeAssociationPreprocessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMAttributeAssociation(entity, vertex, context); + break; + case UPDATE: + updateDMAttributeAssociation(entity, vertex, context); + } + } + + private void createDMAttributeAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + entity.setRelationshipAttributes( + processRelationshipAttributesForAttribute(entity, entity.getRelationshipAttributes(), context)); + } + private void updateDMAttributeAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = replicateDMAssociation(entity, vertex, now); + AtlasEntity copyEntity = modelResponse.getReplicaEntity(); + AtlasVertex copyVertex = modelResponse.getReplicaVertex(); + applyDiffs(entity, copyEntity, ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE); + unsetExpiredDates(copyEntity, copyVertex); + + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entity, entity.getAppendRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponse.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponse.getReplicaEntity()); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java new file mode 100644 index 0000000000..c933f02519 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java @@ -0,0 +1,182 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMAttributePreprocessor extends AbstractModelPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMAttributePreprocessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMAttribute(entity, vertex, context); + break; + case UPDATE: + updateDMAttributes(entity, vertex, context); + } + } + + private void createDMAttribute(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + if (!entityAttribute.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return; + } + if (CollectionUtils.isEmpty(context.getCreatedEntities())) { + return; + } + + String entityName = (String) entityAttribute.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + long now = RequestContext.get().getRequestTime(); + + + // get entity qualifiedName with qualifiedNamePrefix + String attributeQualifiedNamePrefix = (String) entityAttribute.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = attributeQualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = attributeQualifiedNamePrefix.substring(0, lastIndex); + String namespace = (String) entityAttribute.getAttributes().get(ATLAS_DM_NAMESPACE); + String modelVersion = "v1"; + + ModelResponse modelENtityResponse = null; + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); + + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + String modelGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), + attrValues); + + List existingAttributes = null; + + // + if (latestEntityVertex != null) { + modelENtityResponse = replicateModelEntity( + entityRetriever.toAtlasEntity(latestEntityVertex), + latestEntityVertex, + entityQualifiedNamePrefix, + now + ); + modelVersion = "v2"; + if (modelENtityResponse.getExistingEntity() != null && modelENtityResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingAttributes = (List) modelENtityResponse.getExistingEntity().getRelationshipAttributes().get("dMAttributes"); + } + } else { + int lastSlashIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + + // Extract the substring after the last "/" + String name = entityQualifiedNamePrefix.substring(lastSlashIndex + 1); + modelENtityResponse = createEntity( + entityQualifiedNamePrefix + "_" + now, + name, + ATLAS_DM_ENTITY_TYPE, + namespace, + context + ); + } + + List existingEntities = null; + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + if (modelVersionResponse.getReplicaEntity() == null) { + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + + // model --- modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // modelVersion --- entity relation + createModelVersionModelEntityRelationship(latestModelVersionVertex, modelENtityResponse.getReplicaVertex()); + + // modelVersion --- entitiesOfExistingModelVersion + if (modelVersionResponse.getExistingEntity() != null && modelVersionResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingEntities = (List) modelVersionResponse.getExistingEntity().getRelationshipAttributes().get("dMEntities"); + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + } + + // entity --- attributes of existingEntity relation + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), existingAttributes); + + // latest entity ---- new attribute relation + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), vertexAttribute); + + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), latestModelVersionVertex); + + context.addCreated(modelENtityResponse.getReplicaEntity().getGuid(), modelENtityResponse.getReplicaEntity(), + typeRegistry.getEntityTypeByName(ATLAS_DM_ENTITY_TYPE), modelENtityResponse.getReplicaVertex()); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + entityAttribute.setRelationshipAttributes(processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getRelationshipAttributes(), context)); + } + + private void updateDMAttributes(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + ModelResponse modelResponseParentEntity = updateDMAttribute(entityAttribute, vertexAttribute, context); + // case when a mapping is added + if (entityAttribute.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getAppendRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setAppendRelationshipAttributes(new HashMap<>(appendRelationshipAttributes)); + context.removeUpdatedWithRelationshipAttributes(entityAttribute); + } + + if (entityAttribute.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getRemoveRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entityAttribute); + } + } + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java new file mode 100644 index 0000000000..1e549b4090 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java @@ -0,0 +1,103 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMEntityAssociationPreProcessor extends AbstractModelPreProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMEntityAssociationPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMEntityAssociation(entity, vertex, context); + break; + case UPDATE: + updateDMEntityAssociation(entity, vertex, context); + } + } + + private void createDMEntityAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + entity.setRelationshipAttributes( + processRelationshipAttributesForEntity(entity, entity.getRelationshipAttributes(), context)); + } + private void updateDMEntityAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = replicateDMAssociation(entity, vertex, now); + AtlasEntity copyEntity = modelResponse.getReplicaEntity(); + AtlasVertex copyVertex = modelResponse.getReplicaVertex(); + applyDiffs(entity, copyEntity, ATLAS_DM_ENTITY_ASSOCIATION_TYPE); + unsetExpiredDates(copyEntity, copyVertex); + + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getAppendRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponse.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponse.getReplicaEntity()); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java new file mode 100644 index 0000000000..3c7149dfbd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java @@ -0,0 +1,157 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +@Component +public class DMEntityPreProcessor extends AbstractModelPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DMEntityPreProcessor.class); + + + public DMEntityPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + + switch (operation) { + case CREATE: + createDMEntity(entity, vertex, context); + break; + case UPDATE: + updateDMEntities(entity, vertex, context); + break; + } + } + + private void createDMEntity(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + if (CollectionUtils.isEmpty(context.getCreatedEntities())) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + long now = RequestContext.get().getRequestTime(); + + // get model qualifiedName with qualifiedNamePrefix + String qualifiedNamePrefix = (String) entity.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); + + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + // create modelVersion + if (modelVersionResponse.getReplicaEntity() == null) { + String namespace = (String) entity.getAttributes().get(ATLAS_DM_NAMESPACE); + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + "v1"), + "v1", + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + + // model --- modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // modelVersion --- entity relation + createModelVersionModelEntityRelationship(latestModelVersionVertex, vertex); + + if (modelVersionResponse.getExistingEntity() != null) { + List existingEntities = (List) modelVersionResponse.getExistingEntity() + .getRelationshipAttributes() + .get("dMEntities"); + // modelVersion --- entitiesOfExistingModelVersion + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + + } + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), latestModelVersionVertex); + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + entity.setRelationshipAttributes( + processRelationshipAttributesForEntity(entity, entity.getRelationshipAttributes(), context)); + } + + private void updateDMEntities(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + ModelResponse modelResponseParentEntity = updateDMEntity(entity, vertex, context); + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getAppendRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponseParentEntity.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponseParentEntity.getReplicaEntity()); + } + } +} + + + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java new file mode 100644 index 0000000000..d6fd46afc3 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java @@ -0,0 +1,41 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graphdb.AtlasVertex; + +public class ModelResponse { + + private AtlasEntity existingEntity; + private AtlasEntity replicaEntity; + private AtlasVertex existingVertex; + private AtlasVertex replicaVertex; + + public ModelResponse(AtlasEntity existingEntity, AtlasEntity replicaEntity, + AtlasVertex existingVertex, AtlasVertex replicaVertex) { + this.existingEntity = existingEntity; + this.replicaEntity = replicaEntity; + this.existingVertex = existingVertex; + this.replicaVertex = replicaVertex; + } + + public ModelResponse(AtlasEntity replicaEntity, AtlasVertex replicaVertex) { + this.replicaEntity = replicaEntity; + this.replicaVertex = replicaVertex; + } + + public AtlasEntity getExistingEntity() { + return existingEntity; + } + + public AtlasEntity getReplicaEntity() { + return replicaEntity; + } + + public AtlasVertex getExistingVertex() { + return existingVertex; + } + + public AtlasVertex getReplicaVertex() { + return replicaVertex; + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java index dd83cfc77f..206b552ec6 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java @@ -1,9 +1,5 @@ package org.apache.atlas.web.rest; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; @@ -12,10 +8,23 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.IndexSearchParams; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.AbstractModelPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.ModelResponse; import org.apache.atlas.searchlog.SearchLoggingManagement; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.ModelUtil; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -29,10 +38,9 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.util.Arrays; -import java.util.Base64; -import java.util.HashSet; -import java.util.Set; +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; @Path("model") @Singleton @@ -41,43 +49,41 @@ @Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) public class ModelREST { - private static final String BUSINESS_DATE = "dMDataModelBusinessDate"; - private static final String EXPIRED_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; - private static final String LESSER_THAN_EQUAL_TO = "lte"; - private static final String SYSTEM_DATE = "dMDataModelSystemDate"; - private static final String EXPIRED_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; - private static final String NAMESPACE = "dMDataModelNamespace"; - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.DiscoveryREST"); + + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.ModelREST"); private static final Logger LOG = LoggerFactory.getLogger(DiscoveryREST.class); @Context private HttpServletRequest httpServletRequest; private final boolean enableSearchLogging; - private final AtlasTypeRegistry typeRegistry; + private final AbstractModelPreProcessor entityPreProcessor; private final AtlasDiscoveryService discoveryService; private final SearchLoggingManagement loggerManagement; + private final EntityGraphMapper entityGraphMapper; + private final EntityGraphRetriever graphRetriever; + + //private final Entity private static final String INDEXSEARCH_TAG_NAME = "indexsearch"; private static final Set TRACKING_UTM_TAGS = new HashSet<>(Arrays.asList("ui_main_list", "ui_popup_searchbar")); private static final String UTM_TAG_FROM_PRODUCT = "project_webapp"; @Inject - public ModelREST(AtlasTypeRegistry typeRegistry, AtlasDiscoveryService discoveryService, - SearchLoggingManagement loggerManagement) { + public ModelREST(AtlasTypeRegistry typeRegistry, AtlasDiscoveryService discoveryService, SearchLoggingManagement loggerManagement, EntityGraphMapper entityGraphMapper, EntityGraphRetriever graphRetriever, DMEntityPreProcessor entityPreProcessor) { this.typeRegistry = typeRegistry; this.discoveryService = discoveryService; this.loggerManagement = loggerManagement; + this.entityGraphMapper = entityGraphMapper; + this.graphRetriever = graphRetriever; + this.entityPreProcessor = entityPreProcessor; this.enableSearchLogging = AtlasConfiguration.ENABLE_SEARCH_LOGGER.getBoolean(); } @Path("/search") @POST @Timed - public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, - @QueryParam("businessDate") String businessDate, - @QueryParam("systemDate") String systemDate, - @Context HttpServletRequest servletRequest, IndexSearchParams parameters) throws AtlasBaseException { + public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, @QueryParam("businessDate") String businessDate, @QueryParam("systemDate") String systemDate, @Context HttpServletRequest servletRequest, IndexSearchParams parameters) throws AtlasBaseException { Servlets.validateQueryParamLength("namespace", namespace); Servlets.validateQueryParamLength("businessDate", businessDate); @@ -91,10 +97,7 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, parameters = parameters == null ? new IndexSearchParams() : parameters; - String queryStringUsingFiltersAndUserDSL = createQueryStringUsingFiltersAndUserDSL(namespace, - businessDate, - systemDate, - parameters.getQuery()); + String queryStringUsingFiltersAndUserDSL = ModelUtil.createQueryStringUsingFiltersAndUserDSL(namespace, businessDate, systemDate, parameters.getQuery()); if (StringUtils.isEmpty(queryStringUsingFiltersAndUserDSL)) { AtlasBaseException abe = new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid model search query"); @@ -111,19 +114,15 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, if (result == null) { return null; } - - return result; } catch (AtlasBaseException abe) { - if (enableSearchLogging && parameters.isSaveSearchLog() - ) { + if (enableSearchLogging && parameters.isSaveSearchLog()) { // logSearchLog(parameters, servletRequest, abe, System.currentTimeMillis() - startTime); } throw abe; } catch (Exception e) { AtlasBaseException abe = new AtlasBaseException(e.getMessage(), e.getCause()); - if (enableSearchLogging && parameters.isSaveSearchLog() - ) { + if (enableSearchLogging && parameters.isSaveSearchLog()) { //logSearchLog(parameters, servletRequest, abe, System.currentTimeMillis() - startTime); } throw abe; @@ -149,148 +148,184 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, } } - /*** - * combines user query/dsl along with business parameters - * - * creates query as following : - * {"query":{"bool":{"must":[{"bool":{"filter":[{"match":{"namespace":"{namespace}"}},{"bool":{"must":[{"range":{"businessDate":{"lte":"businessDate"}}},{"bool":{"should":[{"range":{"expiredAtBusinessDate":{"gt":"{businessDate}"}}},{"bool":{"must_not":[{"exists":{"field":"expiredAtBusiness"}}]}}],"minimum_should_match":1}}]}}]}},{"wrapper":{"query":"user query"}}]}}} - * @param namespace - * @param businessDate - * @param dslString - * @return - */ - private String createQueryStringUsingFiltersAndUserDSL(final String namespace, - final String businessDate, - final String systemDate, - final String dslString) { + @DELETE + @Path("/entity") + @Timed + public EntityMutationResponse deleteByQualifiedNamePrefix(@QueryParam("qualifiedNamePrefix") String qualifiedNamePrefix, + @QueryParam("entityType") String entityType, + @Context HttpServletRequest servletRequest) throws AtlasBaseException { + + Servlets.validateQueryParamLength("qualifiedNamePrefix", qualifiedNamePrefix); + Servlets.validateQueryParamLength("entityType", entityType); + AtlasPerfTracer perf = null; + + EntityGraphDiscoveryContext graphDiscoveryContext = new EntityGraphDiscoveryContext(typeRegistry, null); + EntityMutationContext entityMutationContext = new EntityMutationContext(graphDiscoveryContext); + try { - AtlasPerfMetrics.MetricRecorder addBusinessFiltersToSearchQueryMetric = RequestContext.get().startMetricRecord("createQueryStringUsingFiltersAndUserDSL"); - // Create an ObjectMapper instance - ObjectMapper objectMapper = new ObjectMapper(); - - // Create the root 'query' node - ObjectNode rootNode = objectMapper.createObjectNode(); - ObjectNode queryNode = objectMapper.createObjectNode(); - ObjectNode boolNode = objectMapper.createObjectNode(); - ArrayNode mustArray = objectMapper.createArrayNode(); - - // Create the first 'bool' object inside 'must' - ObjectNode firstBoolNode = objectMapper.createObjectNode(); - ObjectNode filterBoolNode = objectMapper.createObjectNode(); - ArrayNode filterArray = objectMapper.createArrayNode(); - - // Create 'match' object - ObjectNode matchNode = objectMapper.createObjectNode(); - matchNode.put(NAMESPACE.concat(".keyword"), namespace); - - // Add 'match' object to filter - ObjectNode matchWrapper = objectMapper.createObjectNode(); - matchWrapper.set("term", matchNode); - filterArray.add(matchWrapper); - - // add 'businessDateValidation' - ObjectNode businessDateWrapper = dateValidation(businessDate, true, objectMapper); - filterArray.add(businessDateWrapper); - - // add 'systemDateValidation' - if (!StringUtils.isEmpty(systemDate)) { - ObjectNode systemDateWrapper = dateValidation(systemDate, false, objectMapper); - filterArray.add(systemDateWrapper); + if (StringUtils.isEmpty(qualifiedNamePrefix) || StringUtils.isEmpty(entityType)) { + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); } - // Add filter to firstBool - filterBoolNode.set("filter", filterArray); - firstBoolNode.set("bool", filterBoolNode); - - // Add firstBool to must array - mustArray.add(firstBoolNode); - - // process user query - if (!StringUtils.isEmpty(dslString)) { - JsonNode node = new ObjectMapper().readTree(dslString); - JsonNode userQueryNode = node.get("query"); - ObjectNode wrapperNode = objectMapper.createObjectNode(); - String userQueryString = userQueryNode.toString(); - String userQueryBase64 = Base64.getEncoder().encodeToString(userQueryString.getBytes()); - wrapperNode.put("query", userQueryBase64); - // Add wrapper to must array - ObjectNode wrapperWrapper = objectMapper.createObjectNode(); - wrapperWrapper.set("wrapper", wrapperNode); - mustArray.add(wrapperWrapper); + // check with chris how the label look like + // accordingly capitalize letters + AtlasEntityType atlasEntityType = typeRegistry.getEntityTypeByName(entityType); + + if (atlasEntityType == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_TYPE); } + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entityType, qualifiedNamePrefix); + + if (latestEntityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.NO_TYPE_EXISTS_FOR_QUALIFIED_NAME_PREFIX, qualifiedNamePrefix); + } - // Add must array to bool node - boolNode.set("must", mustArray); + String modelQualifiedName; + + if (entityType.equals(ATLAS_DM_ENTITY_TYPE)) { + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + String entityGuid = AtlasGraphUtilsV2.getIdFromVertex(latestEntityVertex); + replicateModelVersionAndExcludeEntity(modelQualifiedName, entityGuid, entityMutationContext); + + } else if (entityType.equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = qualifiedNamePrefix.substring(0, lastIndex); + String attributeGuid = AtlasGraphUtilsV2.getIdFromVertex(latestEntityVertex); + replicateModelVersionAndEntityAndExcludeAttribute(entityQualifiedNamePrefix, + attributeGuid, "", entityMutationContext); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_TYPE); + } + return entityGraphMapper.mapAttributesAndClassifications(entityMutationContext, + false, false, false, false); + } finally { + AtlasPerfTracer.log(perf); + } + } - // Add bool to query - queryNode.set("bool", boolNode); + private void replicateModelVersionAndEntityAndExcludeAttribute(final String entityQualifiedNamePrefix, String deleteAttributeGuid, String deleteEntityGuid, EntityMutationContext entityMutationContext) throws AtlasBaseException { + int lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); - rootNode.set("query", queryNode); + // get entity + // replicate entity + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + AtlasEntity latestEntity = graphRetriever.toAtlasEntity(latestEntityVertex); - // Print the JSON representation of the query - return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode); - } catch (Exception e) { - LOG.error("Error -> createQueryStringUsingFiltersAndUserDSL!", e); + if (latestEntityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_ENTITY_NOT_EXIST); } - return ""; + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = entityPreProcessor.replicateModelEntity(latestEntity, + latestEntityVertex, entityQualifiedNamePrefix, now); + + AtlasEntity replicaEntity = modelResponse.getReplicaEntity(); + AtlasVertex replicaVertex = modelResponse.getReplicaVertex(); + + // exclude attribute from entity + Map relationshipAttributes = excludeEntityFromRelationshipAttribute(deleteAttributeGuid, + replicaEntity.getRelationshipAttributes()); + replicaEntity.setRelationshipAttributes(relationshipAttributes); + + + //replicate modelVersion + ModelResponse modelVersionResponse = replicateModelVersionAndExcludeEntity( + modelQualifiedName, "", entityMutationContext); + + // create entity-modelVersion relationship + entityPreProcessor.createModelVersionModelEntityRelationship( + modelVersionResponse.getReplicaVertex(), + replicaVertex); + + entityMutationContext.addCreated(replicaEntity.getGuid(), + replicaEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_ENTITY_TYPE), + replicaVertex); + } - private ObjectNode dateValidation(final String date, final boolean isBusinessDate, ObjectMapper objectMapper) { + private ModelResponse replicateModelVersionAndExcludeEntity(final String modelQualifiedName, String deleteEntityGuid, EntityMutationContext entityMutationContext) throws AtlasBaseException { + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); - String condition = LESSER_THAN_EQUAL_TO, dateType = BUSINESS_DATE, expiredDateType = EXPIRED_BUSINESS_DATE; + long now = RequestContext.get().getRequestTime(); - if (!isBusinessDate) { - dateType = SYSTEM_DATE; - expiredDateType = EXPIRED_SYSTEM_DATE; + ModelResponse modelResponse = entityPreProcessor.replicateModelVersion(modelGuid, modelQualifiedName, now); + AtlasEntity replicaModelVersionEntity = modelResponse.getReplicaEntity(); + AtlasVertex replicaModelVersionVertex = modelResponse.getReplicaVertex(); + String modelVersionGuid = replicaModelVersionEntity.getGuid(); + + Map relationshipAttributes = excludeEntityFromRelationshipAttribute(deleteEntityGuid, + replicaModelVersionEntity.getRelationshipAttributes()); + replicaModelVersionEntity.setRelationshipAttributes(relationshipAttributes); + entityPreProcessor.createModelModelVersionRelation(modelGuid, modelVersionGuid); + + entityMutationContext.addCreated(modelVersionGuid, replicaModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), replicaModelVersionVertex); + + entityMutationContext.getDiscoveryContext().addResolvedGuid(modelGuid, modelVertex); + return modelResponse; + } + + private Map excludeEntityFromRelationshipAttribute(String entityGuid, Map relationshipAttributes) throws AtlasBaseException { + if (StringUtils.isEmpty(entityGuid)) { + return relationshipAttributes; + } + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + + String guid = ""; + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + if (guid.equals(entityGuid)) { + continue; + } + + destMap = new HashMap<>(relationAttribute); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + if (guid.equals(entityGuid)) { + continue; + } + + Map destMap = new HashMap<>(attributeList); + appendAttributesDestination.put(attribute, destMap); + } + } + } } - // Create the nested 'bool' object inside filter - ObjectNode nestedBoolNode = objectMapper.createObjectNode(); - ArrayNode nestedMustArray = objectMapper.createArrayNode(); - ObjectNode rangeBusinessDateNode = objectMapper.createObjectNode(); - rangeBusinessDateNode.put(condition, date); - - // Add 'range' object to nestedMust - ObjectNode rangeBusinessDateWrapper = objectMapper.createObjectNode(); - rangeBusinessDateWrapper.set("range", objectMapper.createObjectNode().set(dateType, rangeBusinessDateNode)); - nestedMustArray.add(rangeBusinessDateWrapper); - - - // Create 'bool' object for 'should' - ObjectNode shouldBoolNodeWrapper = objectMapper.createObjectNode(); - ObjectNode shouldBoolNode = objectMapper.createObjectNode(); - ArrayNode shouldArray = objectMapper.createArrayNode(); - - // Create 'range' object for 'expiredAtBusinessDate' - ObjectNode rangeExpiredAtNode = objectMapper.createObjectNode(); - rangeExpiredAtNode.put("gt", date); - - // Add 'range' object to should array - ObjectNode rangeExpiredAtWrapper = objectMapper.createObjectNode(); - rangeExpiredAtWrapper.set("range", objectMapper.createObjectNode().set(expiredDateType, rangeExpiredAtNode)); - shouldArray.add(rangeExpiredAtWrapper); - - // add 'term' object to should array - ObjectNode termNode = objectMapper.createObjectNode(); - termNode.put(expiredDateType, 0); - ObjectNode termNodeWrapper = objectMapper.createObjectNode(); - termNodeWrapper.set("term", termNode); - shouldArray.add(termNodeWrapper); - - // Add 'should' to should array - shouldBoolNode.set("should", shouldArray); - shouldBoolNode.put("minimum_should_match", 1); - shouldBoolNodeWrapper.set("bool", shouldBoolNode); - - // Add shouldBoolNodeWrapper to nestedMust - nestedMustArray.add(shouldBoolNodeWrapper); - - // Add nestedMust to nestedBool - nestedBoolNode.set("must", nestedMustArray); - - // Add nestedBool to filter - ObjectNode nestedBoolWrapper = objectMapper.createObjectNode(); - nestedBoolWrapper.set("bool", nestedBoolNode); - return nestedBoolWrapper; + return appendAttributesDestination; } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java b/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java new file mode 100644 index 0000000000..d8e95d1b40 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java @@ -0,0 +1,173 @@ +package org.apache.atlas.web.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.rest.DiscoveryREST; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Base64; + +public class ModelUtil { + + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.ModelUtil"); + private static final Logger LOG = LoggerFactory.getLogger(DiscoveryREST.class); + + private static final String BUSINESS_DATE = "dMDataModelBusinessDate"; + private static final String EXPIRED_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; + private static final String LESSER_THAN_EQUAL_TO = "lte"; + private static final String SYSTEM_DATE = "dMDataModelSystemDate"; + private static final String EXPIRED_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; + private static final String NAMESPACE = "dMDataModelNamespace"; + + /*** + * combines user query/dsl along with business parameters + * + * creates query as following : + * {"query":{"bool":{"must":[{"bool":{"filter":[{"match":{"namespace":"{namespace}"}},{"bool":{"must":[{"range":{"businessDate":{"lte":"businessDate"}}},{"bool":{"should":[{"range":{"expiredAtBusinessDate":{"gt":"{businessDate}"}}},{"bool":{"must_not":[{"exists":{"field":"expiredAtBusiness"}}]}}],"minimum_should_match":1}}]}}]}},{"wrapper":{"query":"user query"}}]}}} + * @param namespace + * @param businessDate + * @param dslString + * @return + */ + public static String createQueryStringUsingFiltersAndUserDSL(final String namespace, + final String businessDate, + final String systemDate, + final String dslString) { + try { + AtlasPerfMetrics.MetricRecorder addBusinessFiltersToSearchQueryMetric = RequestContext.get().startMetricRecord("createQueryStringUsingFiltersAndUserDSL"); + // Create an ObjectMapper instance + ObjectMapper objectMapper = new ObjectMapper(); + + // Create the root 'query' node + ObjectNode rootNode = objectMapper.createObjectNode(); + ObjectNode queryNode = objectMapper.createObjectNode(); + ObjectNode boolNode = objectMapper.createObjectNode(); + ArrayNode mustArray = objectMapper.createArrayNode(); + + // Create the first 'bool' object inside 'must' + ObjectNode firstBoolNode = objectMapper.createObjectNode(); + ObjectNode filterBoolNode = objectMapper.createObjectNode(); + ArrayNode filterArray = objectMapper.createArrayNode(); + + // Create 'match' object + ObjectNode matchNode = objectMapper.createObjectNode(); + matchNode.put(NAMESPACE.concat(".keyword"), namespace); + + // Add 'match' object to filter + ObjectNode matchWrapper = objectMapper.createObjectNode(); + matchWrapper.set("term", matchNode); + filterArray.add(matchWrapper); + + // add 'businessDateValidation' + ObjectNode businessDateWrapper = dateValidation(businessDate, true, objectMapper); + filterArray.add(businessDateWrapper); + + // add 'systemDateValidation' + if (!StringUtils.isEmpty(systemDate)) { + ObjectNode systemDateWrapper = dateValidation(systemDate, false, objectMapper); + filterArray.add(systemDateWrapper); + } + + // Add filter to firstBool + filterBoolNode.set("filter", filterArray); + firstBoolNode.set("bool", filterBoolNode); + + // Add firstBool to must array + mustArray.add(firstBoolNode); + + // process user query + if (!StringUtils.isEmpty(dslString)) { + JsonNode node = new ObjectMapper().readTree(dslString); + JsonNode userQueryNode = node.get("query"); + ObjectNode wrapperNode = objectMapper.createObjectNode(); + String userQueryString = userQueryNode.toString(); + String userQueryBase64 = Base64.getEncoder().encodeToString(userQueryString.getBytes()); + wrapperNode.put("query", userQueryBase64); + // Add wrapper to must array + ObjectNode wrapperWrapper = objectMapper.createObjectNode(); + wrapperWrapper.set("wrapper", wrapperNode); + mustArray.add(wrapperWrapper); + } + + + // Add must array to bool node + boolNode.set("must", mustArray); + + // Add bool to query + queryNode.set("bool", boolNode); + + rootNode.set("query", queryNode); + + // Print the JSON representation of the query + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode); + } catch (Exception e) { + LOG.error("Error -> createQueryStringUsingFiltersAndUserDSL!", e); + } + return ""; + } + + private static ObjectNode dateValidation(final String date, final boolean isBusinessDate, ObjectMapper objectMapper) { + + String condition = LESSER_THAN_EQUAL_TO, dateType = BUSINESS_DATE, expiredDateType = EXPIRED_BUSINESS_DATE; + + if (!isBusinessDate) { + dateType = SYSTEM_DATE; + expiredDateType = EXPIRED_SYSTEM_DATE; + } + // Create the nested 'bool' object inside filter + ObjectNode nestedBoolNode = objectMapper.createObjectNode(); + ArrayNode nestedMustArray = objectMapper.createArrayNode(); + ObjectNode rangeBusinessDateNode = objectMapper.createObjectNode(); + rangeBusinessDateNode.put(condition, date); + + // Add 'range' object to nestedMust + ObjectNode rangeBusinessDateWrapper = objectMapper.createObjectNode(); + rangeBusinessDateWrapper.set("range", objectMapper.createObjectNode().set(dateType, rangeBusinessDateNode)); + nestedMustArray.add(rangeBusinessDateWrapper); + + + // Create 'bool' object for 'should' + ObjectNode shouldBoolNodeWrapper = objectMapper.createObjectNode(); + ObjectNode shouldBoolNode = objectMapper.createObjectNode(); + ArrayNode shouldArray = objectMapper.createArrayNode(); + + // Create 'range' object for 'expiredAtBusinessDate' + ObjectNode rangeExpiredAtNode = objectMapper.createObjectNode(); + rangeExpiredAtNode.put("gt", date); + + // Add 'range' object to should array + ObjectNode rangeExpiredAtWrapper = objectMapper.createObjectNode(); + rangeExpiredAtWrapper.set("range", objectMapper.createObjectNode().set(expiredDateType, rangeExpiredAtNode)); + shouldArray.add(rangeExpiredAtWrapper); + + // add 'term' object to should array + ObjectNode termNode = objectMapper.createObjectNode(); + termNode.put(expiredDateType, 0); + ObjectNode termNodeWrapper = objectMapper.createObjectNode(); + termNodeWrapper.set("term", termNode); + shouldArray.add(termNodeWrapper); + + // Add 'should' to should array + shouldBoolNode.set("should", shouldArray); + shouldBoolNode.put("minimum_should_match", 1); + shouldBoolNodeWrapper.set("bool", shouldBoolNode); + + // Add shouldBoolNodeWrapper to nestedMust + nestedMustArray.add(shouldBoolNodeWrapper); + + // Add nestedMust to nestedBool + nestedBoolNode.set("must", nestedMustArray); + + // Add nestedBool to filter + ObjectNode nestedBoolWrapper = objectMapper.createObjectNode(); + nestedBoolWrapper.set("bool", nestedBoolNode); + return nestedBoolWrapper; + } +}