Skip to content

Commit

Permalink
Merge pull request #3557 from atlanhq/dev/task/CJ-157
Browse files Browse the repository at this point in the history
CJ-157 | Publish API for Data Models
  • Loading branch information
aarshi0301 authored Sep 27, 2024
2 parents 21812c5 + a5782c6 commit 9664aef
Show file tree
Hide file tree
Showing 15 changed files with 1,926 additions and 184 deletions.
23 changes: 22 additions & 1 deletion common/src/main/java/org/apache/atlas/repository/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ public final class Constants {
public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess";
public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName";

/***
* DataModel
*/

public static final String ATLAS_DM_ENTITY_TYPE = "DMEntity";

public static final String ATLAS_DM_ATTRIBUTE_TYPE = "DMAttribute";
public static final String ATLAS_DM_DATA_MODEL = "DMDataModel";

public static final String ATLAS_DM_VERSION_TYPE = "DMVersion";

public static final String ATLAS_DM_ENTITY_ASSOCIATION_TYPE= "DMEntityAssociation";
public static final String ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE= "DMAttributeAssociation";

public static final String ATLAS_DM_QUALIFIED_NAME_PREFIX = "dMQualifiedNamePrefix";
public static final String ATLAS_DM_NAMESPACE = "dMDataModelNamespace";
public static final String ATLAS_DM_EXPIRED_AT_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate";
public static final String ATLAS_DM_EXPIRED_AT_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate";
public static final String ATLAS_DM_SYSTEM_DATE = "dMDataModelSystemDate";
public static final String ATLAS_DM_BUSINESS_DATE = "dMDataModelBusinessDate";


/**
* The homeId field is used when saving into Atlas a copy of an object that is being imported from another
* repository. The homeId will be set to a String that identifies the other repository. The specific format
Expand Down Expand Up @@ -272,7 +294,6 @@ public final class Constants {
public static final String INDEX_PREFIX = "janusgraph_";

public static final String VERTEX_INDEX_NAME = INDEX_PREFIX + VERTEX_INDEX;
public static final String EDGE_INDEX_NAME = INDEX_PREFIX + EDGE_INDEX;

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
Expand Down
11 changes: 9 additions & 2 deletions intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,15 @@ public enum AtlasErrorCode {
TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"),

PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}"),
ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}");

ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}"),
DATA_MODEL_VERSION_NOT_EXIST(400, "ATLAS-400-00-115", "Model version {0} does not exist"),
DATA_ENTITY_NOT_EXIST(400, "ATLAS-400-00-116", "DataEntity {0} does not exist"),
DATA_MODEL_NOT_EXIST(400, "ATLAS-400-00-117", "DataModel {0} does not exist"),
QUALIFIED_NAME_PREFIX_NOT_EXIST(400, "ATLAS-400-00-118", "dMQualifiedNamePrefix is mandatory for DMEntity/DMAttribute"),
NO_TYPE_EXISTS_FOR_QUALIFIED_NAME_PREFIX (400,"ATLAS-400-00-119", "No DMEntity/DMAttribute exists for dMQualifiedNamePrefix : {0}"),
NAME_NAMESPACE_NOT_EXIST (400, "ATLAS-400-00-120", "name/namespace are mandatory for DMEntity/DMAttribute"),
QUALIFIED_NAME_PREFIX_OR_TYPE_NOT_FOUND(400, "ATLAS-400-00-121", "qualifiedName/entityType are mandatory"),
INVALID_ENTITY_TYPE(400, "ATLAS-400-00-122", "Invalid entity type");
private String errorCode;
private String errorMessage;
private Response.Status httpCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
Expand All @@ -40,9 +42,11 @@
import org.apache.atlas.type.TemplateToken;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -103,8 +107,36 @@ public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {

validateLabels(entity.getLabels());


type.validateValue(entity, entity.getTypeName(), messages);



// DMEntity and DMAttributeType are requested for update
// by dMQualifiedNamePrefix which is not a unique attribute
// This can return multiple entity/attribute that match this prefix vale
// we have to return latest entity/attribute
if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) ||
entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)){

String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX);
if (qualifiedNamePrefix.isEmpty()){
throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST);
}
// AtlasVertex vertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entity.getTypeName(), qualifiedNamePrefix);
AtlasVertex vertex= discoveryContext.getResolvedEntityVertex(entity.getGuid());
if (vertex == null) {
// no entity exists with this qualifiedName, set qualifiedName and let entity be created
entity.setAttribute(Constants.QUALIFIED_NAME, qualifiedNamePrefix + "_" + RequestContext.get().getRequestTime());
return;
}

// if guidFromVertex is found let entity be updated
entity.setGuid(AtlasGraphUtilsV2.getIdFromVertex(vertex));
type.getNormalizedValue(entity);
return;
}

if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
}
Expand Down Expand Up @@ -161,6 +193,8 @@ protected void discover() throws AtlasBaseException {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity");
}

validateAttributesForDataModel(entity);

processDynamicAttributes(entity);

walkEntityGraph(entity);
Expand Down Expand Up @@ -485,4 +519,15 @@ private void processDynamicAttributes(AtlasEntity entity) throws AtlasBaseExcept
}
}
}

private void validateAttributesForDataModel(AtlasEntity entity) throws AtlasBaseException {
if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) ||
entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)) {
if (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == null ||
entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == "") {
throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMAttributePreprocessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityAssociationPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor;
Expand Down Expand Up @@ -1655,14 +1658,43 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase
}

List<AtlasEntity> copyOfUpdated = new ArrayList<>(context.getUpdatedEntities());
for (AtlasEntity entity: copyOfUpdated) {
for (AtlasEntity entity : copyOfUpdated) {
entityType = context.getType(entity.getGuid());
preProcessors = getPreProcessor(entityType.getTypeName());
for(PreProcessor processor : preProcessors){
LOG.debug("Executing preprocessor {} for entity {}", processor.getClass().getName(), entity.getGuid());
for (PreProcessor processor : preProcessors) {
processor.processAttributes(entity, context, UPDATE);
}
}

List<AtlasEntity> copyOfAppendRelationshipAttributes = new ArrayList<>(context.getUpdatedEntitiesForAppendRelationshipAttribute());
for (AtlasEntity entity : copyOfAppendRelationshipAttributes) {
entityType = context.getType(entity.getGuid());
if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) ||
entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE)
){
preProcessors = getPreProcessor(entityType.getTypeName());
for (PreProcessor processor : preProcessors) {
processor.processAttributes(entity, context, UPDATE);
}
}
}

List<AtlasEntity> copyOfRemoveRelationshipAttributes = new ArrayList<>(context.getEntitiesUpdatedWithRemoveRelationshipAttribute());
for (AtlasEntity entity : copyOfRemoveRelationshipAttributes) {
entityType = context.getType(entity.getGuid());
if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)
|| entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) ||
entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE)
){
preProcessors = getPreProcessor(entityType.getTypeName());
for (PreProcessor processor : preProcessors) {
processor.processAttributes(entity, context, UPDATE);
}
}
}
}

private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
Expand Down Expand Up @@ -2006,6 +2038,17 @@ public List<PreProcessor> getPreProcessor(String typeName) {

case PROCESS_ENTITY_TYPE:
preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this));
case ATLAS_DM_ENTITY_TYPE:
preProcessors.add(new DMEntityPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ATTRIBUTE_TYPE:
preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ENTITY_ASSOCIATION_TYPE:
preProcessors.add(new DMEntityAssociationPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE:
preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
}

// The default global pre-processor for all AssetTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,7 @@
import java.text.SimpleDateFormat;
import java.util.*;

import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.GLOSSARY_TERMS_EDGE_LABEL;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
import static org.apache.atlas.repository.Constants.NAME;
import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC;
Expand Down Expand Up @@ -708,6 +696,19 @@ public static Iterator<AtlasVertex> findActiveEntityVerticesByType(AtlasGraph gr
return query.vertices().iterator();
}

public static AtlasVertex findLatestEntityAttributeVerticesByType(String typename, String dMQualifiedNamePrefix) {
AtlasGraph graph= getGraphInstance();
AtlasGraphQuery query = graph.query()
.has(ENTITY_TYPE_PROPERTY_KEY, typename)
.has(ATLAS_DM_QUALIFIED_NAME_PREFIX, dMQualifiedNamePrefix)
.has(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0)
.has(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0);

Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
return vertex;
}

public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException {
return relationshipTypeHasInstanceEdges(getGraphInstance(), typeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class EntityMutationContext {
private final EntityGraphDiscoveryContext context;
private final List<AtlasEntity> entitiesCreated = new ArrayList<>();
private final List<AtlasEntity> entitiesUpdated = new ArrayList<>();
private final List<AtlasEntity> entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>();
private final List<AtlasEntity> entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>();
private List<AtlasEntity> entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>();
private List<AtlasEntity> entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>();
private final Map<String, AtlasEntityType> entityVsType = new HashMap<>();
private final Map<String, AtlasVertex> entityVsVertex = new HashMap<>();
private final Map<String, String> guidAssignments = new HashMap<>();
Expand Down Expand Up @@ -59,11 +59,11 @@ public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType
}
}

public void setUpdatedWithRelationshipAttributes(AtlasEntity entity){
public void setUpdatedWithRelationshipAttributes(AtlasEntity entity) {
entitiesUpdatedWithAppendRelationshipAttribute.add(entity);
}

public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity){
public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity) {
entitiesUpdatedWithRemoveRelationshipAttribute.add(entity);
}

Expand All @@ -84,6 +84,39 @@ public void addUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType
}
}

public void removeUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
if (entityVsVertex.containsKey(internalGuid)) { // if the entity was already created/updated
entitiesUpdated.remove(entity);
entityVsType.remove(entity.getGuid(), type);
entityVsVertex.remove(entity.getGuid(), atlasVertex);

// if (!StringUtils.equals(internalGuid, entity.getGuid())) {
// guidAssignments.put(internalGuid, entity.getGuid());
// entityVsVertex.put(internalGuid, atlasVertex);
// }
}
}

public void removeUpdatedWithRelationshipAttributes(AtlasEntity entity) {
Iterator<AtlasEntity> entities = entitiesUpdatedWithAppendRelationshipAttribute.iterator();
while (entities.hasNext()) {
String guid = entities.next().getGuid();
if (guid.equals(entity.getGuid())) {
entities.remove();
}
}
}

public void removeUpdatedWithDeleteRelationshipAttributes(AtlasEntity entity) {
Iterator<AtlasEntity> entities = entitiesUpdatedWithRemoveRelationshipAttribute.iterator();
while (entities.hasNext()) {
String guid = entities.next().getGuid();
if (guid.equals(entity.getGuid())) {
entities.remove();
}
}
}

public void addEntityToRestore(AtlasVertex vertex) {
if (entitiesToRestore == null) {
entitiesToRestore = new ArrayList<>();
Expand Down
Loading

0 comments on commit 9664aef

Please sign in to comment.