Skip to content

Commit

Permalink
NIFI-14129 Added Database Dialect Service
Browse files Browse the repository at this point in the history
- Added database-dialect-service-api
- Added Standard Database Dialect Service implementation
- Added Database Adapter implementation
- Added Database Dialect Service property descriptor to Database Processors
- Refactored Database Processors with optional Database Dialect Service
  • Loading branch information
exceptionfactory committed Jan 16, 2025
1 parent eeae16b commit 50545eb
Show file tree
Hide file tree
Showing 53 changed files with 2,270 additions and 839 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryClause;
import org.apache.nifi.database.dialect.service.api.QueryClauseType;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.DatabaseAdapterDatabaseDialectService;
import org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -38,37 +50,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"database", "dbcp", "sql"})
@CapabilityDescription("Fetches parameters from database tables")

public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();

public static final PropertyDescriptor DB_TYPE;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");

public static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
Expand Down Expand Up @@ -149,6 +141,7 @@ public class DatabaseParameterProvider extends AbstractParameterProvider impleme
protected void init(final ParameterProviderInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DB_TYPE);
properties.add(DATABASE_DIALECT_SERVICE);
properties.add(DBCP_SERVICE);
properties.add(PARAMETER_GROUPING_STRATEGY);
properties.add(TABLE_NAME);
Expand Down Expand Up @@ -233,8 +226,22 @@ private void validateValueNotNull(final String value, final String columnName) {
}

String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

final List<ColumnDefinition> columnDefinitions = columns.stream()
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final QueryStatementRequest queryStatementRequest = new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.empty(),
List.of(new QueryClause(QueryClauseType.WHERE, whereClause)),
Optional.empty()
);
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
return statementResponse.sql();
}

@Override
Expand Down Expand Up @@ -262,4 +269,15 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context,

return results;
}

private DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final DatabaseDialectService databaseDialectService;
final String databaseType = context.getProperty(DB_TYPE).getValue();
if (DatabaseDialectServiceDatabaseAdapter.NAME.equals(databaseType)) {
databaseDialectService = context.getProperty(DATABASE_DIALECT_SERVICE).asControllerService(DatabaseDialectService.class);
} else {
databaseDialectService = new DatabaseAdapterDatabaseDialectService(databaseType);
}
return databaseDialectService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@
<artifactId>nifi-dbcp-service-api</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@
*/
package org.apache.nifi.processors.standard;

import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryClause;
import org.apache.nifi.database.dialect.service.api.QueryClauseType;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -29,7 +39,9 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.DatabaseAdapterDatabaseDialectService;
import org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.util.StringUtils;

Expand All @@ -50,13 +62,14 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -183,9 +196,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
protected static final String NAMESPACE_DELIMITER = "@!@";

public static final PropertyDescriptor DB_TYPE;
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-fetch-db-type");
static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
protected final Map<String, Integer> columnTypeMap = new HashMap<>();

// This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language.
Expand All @@ -204,29 +217,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact

private static final DateTimeFormatter TIME_TYPE_FORMAT = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static final QueryClause ZERO_RESULT_WHERE_CLAUSE = new QueryClause(QueryClauseType.WHERE, "1 = 0");

// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
protected Map<String, String> maxValueProperties;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-fetch-db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}

// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
Expand All @@ -241,6 +236,8 @@ public void setup(final ProcessContext context) {
}

public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

synchronized (setupComplete) {
setupComplete.set(false);
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
Expand All @@ -256,23 +253,15 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();

final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes());
final Statement st = con.createStatement()) {

// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
// to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
// approach as in Apache Drill
String query;

if (StringUtils.isEmpty(sqlQuery)) {
query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
} else {
StringBuilder sbQuery = getWrappedQuery(dbAdapter, sqlQuery, tableName);
sbQuery.append(" WHERE 1=0");

query = sbQuery.toString();
}
final QueryStatementRequest statementRequest = getMaxValueStatementRequest(tableName, maxValueColumnNames, sqlQuery);
final StatementResponse statementResponse = databaseDialectService.getStatement(statementRequest);
final String query = statementResponse.sql();

ResultSet resultSet = st.executeQuery(query);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Expand All @@ -286,13 +275,13 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim());
maxValueQualifiedColumnNameList.add(colKey);
}

for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName, dbAdapter);
String colKey = getStateKey(tableName, colName);

//only include columns that are part of the maximum value tracking column list
if (!maxValueQualifiedColumnNameList.contains(colKey)) {
Expand All @@ -304,7 +293,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase());
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
}
Expand All @@ -320,15 +309,37 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}
}

protected static StringBuilder getWrappedQuery(DatabaseAdapter dbAdapter, String sqlQuery, String tableName) {
return new StringBuilder("SELECT * FROM (" + sqlQuery + ") " + dbAdapter.getTableAliasClause(tableName));
protected DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final DatabaseDialectService databaseDialectService;
final String databaseType = context.getProperty(DB_TYPE).getValue();
if (DatabaseDialectServiceDatabaseAdapter.NAME.equals(databaseType)) {
databaseDialectService = context.getProperty(DATABASE_DIALECT_SERVICE).asControllerService(DatabaseDialectService.class);
} else {
databaseDialectService = new DatabaseAdapterDatabaseDialectService(databaseType);
}
return databaseDialectService;
}

private QueryStatementRequest getMaxValueStatementRequest(final String tableName, final String maxValueColumnNames, final String derivedTableQuery) {
final List<ColumnDefinition> maxValueColumns = Arrays.stream(maxValueColumnNames.split(","))
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();

final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumns);
return new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.ofNullable(derivedTableQuery),
List.of(ZERO_RESULT_WHERE_CLAUSE),
Optional.empty()
);
}

protected static String getMaxValueFromRow(ResultSet resultSet,
int columnIndex,
Integer type,
String maxValueString,
String databaseType)
String maxValueString)
throws ParseException, IOException, SQLException {

// Skip any columns we're not keeping track of or whose value is null
Expand Down Expand Up @@ -520,17 +531,18 @@ protected static String getLiteralByType(int type, String value, String database
* Construct a key string for a corresponding state value.
* @param prefix A prefix may contain database and table name, or just table name, this can be null
* @param columnName A column name
* @param adapter DatabaseAdapter is used to unwrap identifiers
* @return a state key string
*/
protected static String getStateKey(String prefix, String columnName, DatabaseAdapter adapter) {
protected static String getStateKey(String prefix, String columnName) {
StringBuilder sb = new StringBuilder();
if (prefix != null) {
sb.append(adapter.unwrapIdentifier(prefix.toLowerCase()));
final String prefixUnwrapped = prefix.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(prefixUnwrapped);
sb.append(NAMESPACE_DELIMITER);
}
if (columnName != null) {
sb.append(adapter.unwrapIdentifier(columnName.toLowerCase()));
final String columnNameUnwrapped = columnName.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(columnNameUnwrapped);
}
return sb.toString();
}
Expand Down
Loading

0 comments on commit 50545eb

Please sign in to comment.