diff --git a/cockroachdb-persistence/README.md b/cockroachdb-persistence/README.md new file mode 100644 index 0000000..16bfad3 --- /dev/null +++ b/cockroachdb-persistence/README.md @@ -0,0 +1,10 @@ +CockroachDB persistence module for OSS conductor +=================================== +This module provides an implementation of the OSS conductor DAO interfaces using cockroachdb as the persistent data store. +The execution data are stored in CockroachDB in the `workflow_instance` table and `task` table. + +All datastore operations that are used during the critical execution path of a workflow have been implemented. This includes CRUD operations for workflows and tasks. + +This module provides complete implementations for ExecutionDAO, MetadataDAO, IndexDAO, and EventHandlerDAO, PollDataDAO, RateLimitingDAO interfaces. + +This module only provide a dummy implementations for the QueueDAO interface. diff --git a/cockroachdb-persistence/build.gradle b/cockroachdb-persistence/build.gradle new file mode 100644 index 0000000..853c183 --- /dev/null +++ b/cockroachdb-persistence/build.gradle @@ -0,0 +1,13 @@ +apply plugin: 'java-library' + +dependencies { + api conductorDep + api hikaricpDep + api flywayDep + + testImplementation flywayDep + testImplementation testcontainerDep + testImplementation postgresqlDep + testImplementation hikaricpDep + testRuntimeOnly slf4jLog4jDep +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBConfiguration.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBConfiguration.java new file mode 100644 index 0000000..955ec01 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBConfiguration.java @@ -0,0 +1,187 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb; + +import com.netflix.conductor.core.config.Configuration; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +public interface CockroachDBConfiguration extends Configuration { + + String JDBC_URL_PROPERTY_NAME = "jdbc.url"; + String JDBC_URL_DEFAULT_VALUE = "jdbc:postgresql://localhost:5432/conductor"; + + String JDBC_USER_NAME_PROPERTY_NAME = "jdbc.username"; + String JDBC_USER_NAME_DEFAULT_VALUE = "conductor"; + + String JDBC_PASSWORD_PROPERTY_NAME = "jdbc.password"; + String JDBC_PASSWORD_DEFAULT_VALUE = "password"; + + String FLYWAY_ENABLED_PROPERTY_NAME = "flyway.enabled"; + boolean FLYWAY_ENABLED_DEFAULT_VALUE = true; + + String FLYWAY_BASELINE_MIGRATION_ENABLED_PROPERTY_NAME = "flyway.baseline.migration.enabled"; + boolean FLYWAY_BASELINE_MIGRATION_ENABLED_DEFAULT_VALUE = false; + + String FLYWAY_TABLE_PROPERTY_NAME = "flyway.table"; + + // The defaults are currently in line with the HikariConfig defaults, which are unfortunately + // private. + String CONNECTION_POOL_MAX_SIZE_PROPERTY_NAME = "workflow.cockroachdb.connection.pool.size.max"; + int CONNECTION_POOL_MAX_SIZE_DEFAULT_VALUE = -1; + + String CONNECTION_POOL_MINIMUM_IDLE_PROPERTY_NAME = + "workflow.cockroachdb.connection.pool.idle.min"; + int CONNECTION_POOL_MINIMUM_IDLE_DEFAULT_VALUE = -1; + + String CONNECTION_MAX_LIFETIME_PROPERTY_NAME = "workflow.cockroachdb.connection.lifetime.max"; + long CONNECTION_MAX_LIFETIME_DEFAULT_VALUE = TimeUnit.MINUTES.toMillis(30); + + String CONNECTION_IDLE_TIMEOUT_PROPERTY_NAME = "workflow.cockroachdb.connection.idle.timeout"; + long CONNECTION_IDLE_TIMEOUT_DEFAULT_VALUE = TimeUnit.MINUTES.toMillis(10); + + String CONNECTION_TIMEOUT_PROPERTY_NAME = "workflow.cockroachdb.connection.timeout"; + long CONNECTION_TIMEOUT_DEFAULT_VALUE = TimeUnit.SECONDS.toMillis(30); + + String AUTO_COMMIT_PROPERTY_NAME = "workflow.cockroachdb.autocommit"; + // This is consistent with the current default when building the Hikari Client. + boolean AUTO_COMMIT_DEFAULT_VALUE = false; + + String DB_INSERT_BATCH_SIZE_PROPERTY_NAME = "workflow.cockroachdb.insert.batch.size"; + int DB_INSERT_BATCH_SIZE_DEFAULT_VALUE = 64; + + String DB_MAX_SEARCH_SIZE_PROPERTY_NAME = "workflow.cockroachdb.max.search.size"; + int DB_MAX_SEARCH_SIZE_DEFAULT_VALUE = 1000; + + String DB_ERROR_RETRIES_PROPERTY_NAME = "workflow.cockroachdb.error.retries"; + int DB_ERROR_RETRIES_DEFAULT_VALUE = 3; + + String DB_MAX_RETRY_DELAY_PROPERTY_NAME = "workflow.cockroachdb.max.retry.delay"; + int DB_MAX_RETRY_DELAY_DEFAULT_VALUE = 1000; + + String DB_INITIAL_RETRY_DELAY_PROPERTY_NAME = "workflow.cockroachdb.initial.retry.delay"; + int DB_INITIAL_RETRY_DELAY_DEFAULT_VALUE = 100; + + String DB_CLIENT_NAME_PROPERTY_NAME = "workflow.cockroachdb.client.name"; + String DB_CLIENT_NAME_DEFAULT_VALUE = "Conductor"; + + String REWRITE_BATCHED_INSERTS_PROPERTY_NAME = "workflow.cockroachdb.rewrite.batched.inserts"; + boolean REWRITE_BATCHED_INSERTS_DEFAULT_VALUE = true; + + String DB_LOGIN_TIMEOUT_PROPERTY_NAME = "workflow.cockroachdb.login.timeout.seconds"; + int DB_LOGIN_TIMEOUT_DEFAULT_VALUE = 5; + + String DB_SOCKET_TIMEOUT_PROPERTY_NAME = "workflow.cockroachdb.socket.timeout.seconds"; + int DB_SOCKET_TIMEOUT_DEFAULT_VALUE = 30; + + // cockroachdb enterprise version feature + String FOLLOWER_READS_ENABLED_PROPERTY_NAME = "workflow.cockroachdb.follower.reads.enabled"; + boolean FOLLOWER_READS_ENABLED_DEFAULT_VALUE = false; + + default String getJdbcUrl() { + return getProperty(JDBC_URL_PROPERTY_NAME, JDBC_URL_DEFAULT_VALUE); + } + + default String getJdbcUserName() { + return getProperty(JDBC_USER_NAME_PROPERTY_NAME, JDBC_USER_NAME_DEFAULT_VALUE); + } + + default String getJdbcPassword() { + return getProperty(JDBC_PASSWORD_PROPERTY_NAME, JDBC_PASSWORD_DEFAULT_VALUE); + } + + default boolean isFlywayEnabled() { + return getBoolProperty(FLYWAY_ENABLED_PROPERTY_NAME, FLYWAY_ENABLED_DEFAULT_VALUE); + } + + default boolean isFlywayBaseLineMigrationEnabled() { + return getBoolProperty( + FLYWAY_BASELINE_MIGRATION_ENABLED_PROPERTY_NAME, + FLYWAY_BASELINE_MIGRATION_ENABLED_DEFAULT_VALUE); + } + + default Optional getFlywayTable() { + return Optional.ofNullable(getProperty(FLYWAY_TABLE_PROPERTY_NAME, null)); + } + + default int getConnectionPoolMaxSize() { + return getIntProperty( + CONNECTION_POOL_MAX_SIZE_PROPERTY_NAME, CONNECTION_POOL_MAX_SIZE_DEFAULT_VALUE); + } + + default int getConnectionPoolMinIdle() { + return getIntProperty( + CONNECTION_POOL_MINIMUM_IDLE_PROPERTY_NAME, CONNECTION_POOL_MINIMUM_IDLE_DEFAULT_VALUE); + } + + default long getConnectionMaxLifetime() { + return getLongProperty( + CONNECTION_MAX_LIFETIME_PROPERTY_NAME, CONNECTION_MAX_LIFETIME_DEFAULT_VALUE); + } + + default long getConnectionIdleTimeout() { + return getLongProperty( + CONNECTION_IDLE_TIMEOUT_PROPERTY_NAME, CONNECTION_IDLE_TIMEOUT_DEFAULT_VALUE); + } + + default long getConnectionTimeout() { + return getLongProperty(CONNECTION_TIMEOUT_PROPERTY_NAME, CONNECTION_TIMEOUT_DEFAULT_VALUE); + } + + default boolean isAutoCommit() { + return getBoolProperty(AUTO_COMMIT_PROPERTY_NAME, AUTO_COMMIT_DEFAULT_VALUE); + } + + default int getDbInsertBatchSize() { + return getIntProperty(DB_INSERT_BATCH_SIZE_PROPERTY_NAME, DB_INSERT_BATCH_SIZE_DEFAULT_VALUE); + } + + default int getDbMaxSearchSize() { + return getIntProperty(DB_MAX_SEARCH_SIZE_PROPERTY_NAME, DB_MAX_SEARCH_SIZE_DEFAULT_VALUE); + } + + default int getDbErrorRetries() { + return getIntProperty(DB_ERROR_RETRIES_PROPERTY_NAME, DB_ERROR_RETRIES_DEFAULT_VALUE); + } + + default int getDbMaxRetryDelay() { + return getIntProperty(DB_MAX_RETRY_DELAY_PROPERTY_NAME, DB_MAX_RETRY_DELAY_DEFAULT_VALUE); + } + + default int getDbInitialRetryDelay() { + return getIntProperty( + DB_INITIAL_RETRY_DELAY_PROPERTY_NAME, DB_INITIAL_RETRY_DELAY_DEFAULT_VALUE); + } + + default String getClientName() { + return getProperty(DB_CLIENT_NAME_PROPERTY_NAME, DB_CLIENT_NAME_DEFAULT_VALUE); + } + + default boolean isRewriteBatchedInserts() { + return getBooleanProperty( + REWRITE_BATCHED_INSERTS_PROPERTY_NAME, REWRITE_BATCHED_INSERTS_DEFAULT_VALUE); + } + + default int getDbLoginTimeout() { + return getIntProperty(DB_LOGIN_TIMEOUT_PROPERTY_NAME, DB_LOGIN_TIMEOUT_DEFAULT_VALUE); + } + + default int getDbSocketTimeout() { + return getIntProperty(DB_SOCKET_TIMEOUT_PROPERTY_NAME, DB_SOCKET_TIMEOUT_DEFAULT_VALUE); + } + + default boolean isFollowerReadsEnabled() { + return getBooleanProperty( + FOLLOWER_READS_ENABLED_PROPERTY_NAME, FOLLOWER_READS_ENABLED_DEFAULT_VALUE); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBDataSourceProvider.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBDataSourceProvider.java new file mode 100644 index 0000000..1173ca6 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/CockroachDBDataSourceProvider.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.util.concurrent.ThreadFactory; +import javax.sql.DataSource; +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** cockroach db data source provider */ +public class CockroachDBDataSourceProvider { + private static final Logger logger = LoggerFactory.getLogger(CockroachDBDataSourceProvider.class); + + private final CockroachDBConfiguration configuration; + + public CockroachDBDataSourceProvider(CockroachDBConfiguration configuration) { + this.configuration = configuration; + } + + public DataSource get() { + HikariDataSource dataSource = null; + try { + dataSource = new HikariDataSource(createConfiguration()); + flywayMigrate(dataSource); + return dataSource; + } catch (final Throwable t) { + if (null != dataSource && !dataSource.isClosed()) { + dataSource.close(); + } + logger.error("error migration DB", t); + throw t; + } + } + + private HikariConfig createConfiguration() { + HikariConfig cfg = new HikariConfig(); + cfg.setJdbcUrl(configuration.getJdbcUrl()); + cfg.setUsername(configuration.getJdbcUserName()); + cfg.setPassword(configuration.getJdbcPassword()); + cfg.setAutoCommit(false); + cfg.setMaximumPoolSize(configuration.getConnectionPoolMaxSize()); + cfg.setMinimumIdle(configuration.getConnectionPoolMinIdle()); + cfg.setMaxLifetime(configuration.getConnectionMaxLifetime()); + cfg.setIdleTimeout(configuration.getConnectionIdleTimeout()); + cfg.setConnectionTimeout(configuration.getConnectionTimeout()); + cfg.setAutoCommit(configuration.isAutoCommit()); + cfg.addDataSourceProperty("ApplicationName", configuration.getClientName()); + cfg.addDataSourceProperty("rewriteBatchedInserts", configuration.isRewriteBatchedInserts()); + cfg.addDataSourceProperty("loginTimeout", configuration.getDbLoginTimeout()); + cfg.addDataSourceProperty("socketTimeout", configuration.getDbSocketTimeout()); + + ThreadFactory tf = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("hikari-cockroachdb-%d").build(); + + cfg.setThreadFactory(tf); + return cfg; + } + + // todo add complete lifecycle for the connection, i.e. startup and shutdown. + private void flywayMigrate(DataSource dataSource) { + boolean enabled = configuration.isFlywayEnabled(); + if (!enabled) { + logger.debug("Flyway migrations are disabled"); + return; + } + + Flyway flyway = + Flyway.configure() + .dataSource(dataSource) + .placeholderReplacement(false) + .baselineOnMigrate(configuration.isFlywayBaseLineMigrationEnabled()) + .load(); + + flyway.migrate(); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseDAO.java new file mode 100644 index 0000000..01c93a0 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseDAO.java @@ -0,0 +1,424 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static com.netflix.conductor.core.execution.ApplicationException.Code.BACKEND_ERROR; +import static com.netflix.conductor.core.execution.ApplicationException.Code.INTERNAL_ERROR; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.cockroachdb.util.ResultProcessor; +import com.netflix.conductor.cockroachdb.util.StatementFunction; +import com.netflix.conductor.cockroachdb.util.StatementPreparer; +import com.netflix.conductor.cockroachdb.util.TransactionalFunction; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.metrics.Monitors; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Savepoint; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB Base DAO to support basic DB operation. + * + * @author jun-he + */ +public abstract class CockroachDBBaseDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBBaseDAO.class); + + private static final String SAVEPOINT_NAME = "cockroach_restart"; + private static final String RETRY_SQL_STATE = "40001"; + + private static final String VERSION_COLUMN = "version"; + protected static final String PAYLOAD_COLUMN = "payload"; + protected static final String ID_COLUMN = "id"; + protected static final String STATUS_COLUMN = "status"; + protected static final int SUCCESS_WRITE_SIZE = 1; + protected static final String ARRAY_TYPE_NAME = "TEXT"; + static final String SET_FOLLOWER_READS_MODE = + "SET TRANSACTION AS OF SYSTEM TIME experimental_follower_read_timestamp()"; + + private final ObjectMapper objectMapper; + private final DataSource dataSource; + private final int maxRetries; + private final boolean isFollowerReadsEnabled; + private final int maxRetryDelay; + private final int initialRetryDelay; + + public CockroachDBBaseDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + this.dataSource = dataSource; + this.objectMapper = objectMapper; + this.maxRetries = config.getDbErrorRetries(); + this.isFollowerReadsEnabled = config.isFollowerReadsEnabled(); + this.maxRetryDelay = config.getDbMaxRetryDelay(); + this.initialRetryDelay = config.getDbInitialRetryDelay(); + } + + @VisibleForTesting + void validateTasks(Collection tasks) { + Preconditions.checkNotNull(tasks, "Tasks object cannot be null"); + Preconditions.checkArgument(!tasks.isEmpty(), "Tasks object cannot be empty"); + tasks.forEach( + task -> { + Preconditions.checkNotNull(task, "task object cannot be null"); + Preconditions.checkNotNull(task.getTaskId(), "Task id cannot be null"); + Preconditions.checkNotNull( + task.getWorkflowInstanceId(), "Workflow instance id cannot be null"); + Preconditions.checkNotNull( + task.getReferenceTaskName(), "Task reference name cannot be null"); + }); + + Preconditions.checkArgument( + tasks.stream().map(Task::getWorkflowInstanceId).distinct().count() <= 1, + "Tasks of multiple workflows cannot be created/updated simultaneously"); + } + + @VisibleForTesting + void validateWorkflow(Workflow workflow) { + Preconditions.checkNotNull(workflow.getWorkflowId()); + } + + T getPayload(String stmt, StatementPreparer preparer, Class clazz) { + return withRetryableQuery(stmt, preparer, r -> payloadFromResult(r, clazz)); + } + + T getReadOnlyPayload(String stmt, StatementPreparer preparer, Class clazz) { + return withReadOnlyQuery(stmt, preparer, r -> payloadFromResult(r, clazz)); + } + + protected List getPayloads(String stmt, StatementPreparer preparer, Class clazz) { + return withRetryableQuery(stmt, preparer, r -> payloadsFromResult(r, clazz)); + } + + List idsFromResult(ResultSet result, int version) throws SQLException { + List ids = new ArrayList<>(); + while (result.next()) { + if (version < 0 || version == result.getInt(VERSION_COLUMN)) { + ids.add(result.getString(ID_COLUMN)); + } + } + return ids; + } + + private T payloadFromResult(ResultSet result, Class clazz) throws SQLException { + if (result.next()) { + String payload = result.getString(PAYLOAD_COLUMN); + if (payload != null && !payload.isEmpty()) { + return fromJson(payload, clazz); + } + } + return null; + } + + private List payloadsFromResult(ResultSet result, Class clazz) throws SQLException { + List results = new ArrayList<>(); + while (result.next()) { + String payload = result.getString(PAYLOAD_COLUMN); + if (payload != null && !payload.isEmpty()) { + results.add(fromJson(payload, clazz)); + } + } + return results; + } + + /** + * Convert the object to the JSON string + * + * @param value the object value + * @return serialized string + */ + protected String toJson(Object value) { + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new ApplicationException(INTERNAL_ERROR, e); + } + } + + /** + * Parse a JSON to a given class instance + * + * @param json json string + * @return deserialized object + */ + protected T fromJson(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (JsonProcessingException e) { + throw new ApplicationException(INTERNAL_ERROR, e); + } + } + + /** + * Parse a JSON to a given TypeReference instance + * + * @param json json string + * @return deserialized object + */ + protected T fromJson(String json, TypeReference valueTypeRef) { + try { + return objectMapper.readValue(json, valueTypeRef); + } catch (JsonProcessingException e) { + throw new ApplicationException(INTERNAL_ERROR, e); + } + } + + /** + * A wrapper class to wrap the common logic for try ... catch... + * + * @param supplier a provided wrapped function, it may throw runtime exceptions + * @param methodName a provided method name to log metrics + * @param log a provided log string with argument support + * @param args an array of arguments for logging + * @return the result from supplier + */ + protected T withMetricLogError( + Supplier supplier, String methodName, String log, Object... args) { + try { + return supplier.get(); + } catch (Exception e) { + Object[] combinedArgs; + if (args != null) { + combinedArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, combinedArgs, 0, args.length); + } else { + combinedArgs = new Object[1]; + } + combinedArgs[combinedArgs.length - 1] = e.getMessage(); + Monitors.error(this.getClass().getName(), methodName); + LOG.error(log + " due to {}", combinedArgs); + throw e; + } + } + + /** + * Initialize a new transactional {@link Connection} PreparedStatement {@link PreparedStatement} + * from {@link #dataSource} and pass it to {@literal statement preparer}. After querying the + * results from DB, it then passes them to a processor. It uses follower read mode if supported + * and does not retry. + * + * @param statement sql statement + * @param preparer function to set parameters of the prepared statement + * @param processor processor to process the result + * @return the result + * @throws ApplicationException If any errors occur. + */ + protected T withReadOnlyQuery( + final String statement, StatementPreparer preparer, ResultProcessor processor) { + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(statement)) { + if (isFollowerReadsEnabled) { + try (PreparedStatement stmt1 = conn.prepareStatement(SET_FOLLOWER_READS_MODE)) { + stmt1.executeUpdate(); + } + } + preparer.prepare(stmt); + try (ResultSet result = stmt.executeQuery()) { + return processor.process(result); + } + } catch (SQLException e) { + LOG.error( + "with readOnlyQuery, non-retryable exception occurred: sql state = [{}], message = [{}]", + e.getSQLState(), + e.getMessage()); + throw new ApplicationException(BACKEND_ERROR, e.getMessage(), e); + } + } + + /** + * Initialize a new transactional {@link Connection} PreparedStatement {@link PreparedStatement} + * from {@link #dataSource} and pass it to {@literal statement preparer}. After querying the + * results from DB, it then passes them to a processor. It includes a retry mechanism as + * CockroachDB recommend to implement client side retry + * + * @param stmt sql statement + * @param preparer function to set parameters of the prepared statement + * @param processor processor to process the result + * @return result for the query + */ + protected T withRetryableQuery( + String stmt, StatementPreparer preparer, ResultProcessor processor) { + return withRetryableStatement( + stmt, + statement -> { + preparer.prepare(statement); + try (ResultSet result = statement.executeQuery()) { + return processor.process(result); + } + }); + } + + /** + * Initialize a new transactional {@link Connection} PreparedStatement {@link PreparedStatement} + * from {@link #dataSource} and pass it to {@literal statement preparer}. Then it updates the DB + * records accordingly with retry support. + * + * @param stmt sql statement + * @param preparer function to set parameters of the prepared statement + * @return result for the update + */ + protected int withRetryableUpdate(String stmt, StatementPreparer preparer) { + return withRetryableStatement( + stmt, + statement -> { + preparer.prepare(statement); + return statement.executeUpdate(); + }); + } + + /** + * Initialize a new transactional {@link Connection} from {@link #dataSource} and pass it to + * {@literal function}. It includes a retry mechanism as CockroachDB recommend to implement client + * side retry + * + *

Successful executions of {@literal function} will result in a commit and return of {@link + * TransactionalFunction#apply)}. + * + *

If SQL exception has a retry sql state without exhausting the retry chances, the transaction + * rollback to the save point and retry. Otherwise, the error will be thrown. + * + *

The retry only handles transaction contention error. It does not handle other error cases, + * e.g. disconnection or timeout. + * + *

Generally this is used to wrap multiple cockroachDB statements producing some expected + * return value. + * + * @param function The function to apply with a new transactional {@link Connection} + * @param The return type. + * @return The result of {@code TransactionalFunction#apply(Connection)} + * @throws ApplicationException If any errors occur. + */ + protected R withRetryableTransaction(final TransactionalFunction function) { + try (Connection connection = dataSource.getConnection()) { + connection.setAutoCommit(false); // manually manage the commit lifecycle + int retryCount = 0; + while (true) { + Savepoint sp = connection.setSavepoint(SAVEPOINT_NAME); + try { + R result = function.apply(connection); + connection.releaseSavepoint(sp); + connection.commit(); + return result; + } catch (SQLException e) { + if (retryCount < maxRetries && RETRY_SQL_STATE.equals(e.getSQLState())) { + LOG.warn( + "retryable exception occurred: sql state = [{}], message = [{}], retry counter = {}", + e.getSQLState(), + e.getMessage(), + retryCount); + connection.rollback(sp); + retryCount++; + int sleepMillis = + Math.min( + maxRetryDelay, + (int) (Math.pow(2, retryCount) * initialRetryDelay) + + ThreadLocalRandom.current().nextInt(initialRetryDelay)); + TimeUnit.MILLISECONDS.sleep(sleepMillis); + } else { + LOG.warn( + "non-retryable exception occurred: sql state = [{}], message = [{}], retry counter = {}", + e.getSQLState(), + e.getMessage(), + retryCount); + connection.rollback(); + throw e; + } + } + } + } catch (SQLException e) { + LOG.warn( + "non-retryable exception occurred: sql state = [{}], message = [{}]", + e.getSQLState(), + e.getMessage()); + if (e.getMessage() != null + && e.getMessage().startsWith("ERROR: split failed while applying backpressure to")) { + Monitors.error(this.getClass().getName(), "nonretryable_mvcc_error"); + } else { + Monitors.error(this.getClass().getName(), "retryable_transaction_error_" + e.getSQLState()); + } + throw new ApplicationException(BACKEND_ERROR, e.getMessage(), e); + } catch (InterruptedException e) { + LOG.warn("InterruptedException exception occurred: message = [{}]", e.getMessage()); + Thread.currentThread().interrupt(); + throw new ApplicationException(INTERNAL_ERROR, e.getMessage(), e); + } + } + + /** + * Initialize a new transactional {@link Connection} from {@link #dataSource}, prepare a new SQL + * statement, and pass then it to {@literal function}. It includes a retry mechanism as + * CockroachDB recommend to implement client side retry + * + *

Successful executions of {@literal function} will result in a commit and return of {@link + * StatementFunction#apply)}. + * + *

If SQL exception has a retry sql state without exhausting the retry chances, the transaction + * rollback to the save point and retry with a new prepared statement. Otherwise, the error will + * be thrown. + * + *

The retry only handles transaction contention error. It does not handle other error cases, + * e.g. disconnection or timeout. + * + *

Generally this is used to wrap a {@link PreparedStatement} producing some expected return + * value. + * + * @param function The function to apply with a new {@link PreparedStatement} in a transactional + * {@link Connection} + * @param The return type. + * @return The result of {@code StatementFunction#apply()} + * @throws ApplicationException If any errors occur. + */ + protected R withRetryableStatement( + final String statement, final StatementFunction function) { + return withRetryableTransaction( + conn -> { + try (PreparedStatement stmt = conn.prepareStatement(statement)) { + return function.apply(stmt); + } + }); + } + + /** + * Get time from the timestamp field if present. + * + * @param rs The result set. + * @param field Timestamp field. + * @return The timestamp value in long if not null. + * @throws SQLException sql exception + */ + protected Long getTimestampIfPresent(ResultSet rs, String field) throws SQLException { + Timestamp timestamp = rs.getTimestamp(field); + if (timestamp != null) { + return timestamp.getTime(); + } + return null; + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAO.java new file mode 100644 index 0000000..0f11e77 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAO.java @@ -0,0 +1,111 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.cockroachdb.util.StatementPreparer; +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.dao.EventHandlerDAO; +import java.util.List; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB implementation of EventHandlerDAO. + * + *

It manages event handler definition metadata. + * + * @author jun-he + */ +public class CockroachDBEventHandlerDAO extends CockroachDBBaseDAO implements EventHandlerDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBEventHandlerDAO.class); + + private static final String UPSERT_EVENT_HANDLER_STATEMENT = + "UPSERT INTO event_handler (handler_name,payload) VALUES (?,?)"; + private static final String REMOVE_EVENT_HANDLER_STATEMENT = + "DELETE FROM event_handler WHERE handler_name = ?"; + private static final String GET_EVENT_HANDLERS_STATEMENT = "SELECT payload FROM event_handler"; + private static final String GET_EVENT_HANDLERS_FOR_EVENT_STATEMENT = + "SELECT payload FROM event_handler where event = ? and active = ?"; + + public CockroachDBEventHandlerDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + @Override + public void addEventHandler(EventHandler eventHandler) { + LOG.info("Creating an event handler with name: {}", eventHandler.getName()); + upsertEventHandler(eventHandler, "addEventHandler"); + } + + @Override + public void updateEventHandler(EventHandler eventHandler) { + LOG.info("Updating an event handler with name: {}", eventHandler.getName()); + upsertEventHandler(eventHandler, "updateEventHandler"); + } + + private void upsertEventHandler(EventHandler eventHandler, String methodName) { + withMetricLogError( + () -> + withRetryableUpdate( + UPSERT_EVENT_HANDLER_STATEMENT, + statement -> { + statement.setString(1, eventHandler.getName()); + statement.setString(2, toJson(eventHandler)); + }), + methodName, + "Failed {} for handler: {}", + methodName, + eventHandler); + } + + @Override + public void removeEventHandler(String name) { + withMetricLogError( + () -> + withRetryableUpdate( + REMOVE_EVENT_HANDLER_STATEMENT, statement -> statement.setString(1, name)), + "removeEventHandler", + "Failed removing an event handler with name {}", + name); + } + + @Override + public List getAllEventHandlers() { + return withMetricLogError( + () -> + getPayloads(GET_EVENT_HANDLERS_STATEMENT, StatementPreparer.NO_OP, EventHandler.class), + "getAllEventHandlers", + "Failed getting all event handlers"); + } + + @Override + public List getEventHandlersForEvent(String event, boolean activeOnly) { + return withMetricLogError( + () -> + getPayloads( + GET_EVENT_HANDLERS_FOR_EVENT_STATEMENT, + statement -> { + statement.setString(1, event); + statement.setBoolean(2, activeOnly); + }, + EventHandler.class), + "getEventHandlersForEvent", + "Failed getting all event handlers for [active {}] event {}", + activeOnly, + event); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAO.java new file mode 100644 index 0000000..ceab7bf --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAO.java @@ -0,0 +1,648 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.IndexDAO; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB implementation of Execution DAO. + * + * @author jun-he + */ +public class CockroachDBExecutionDAO extends CockroachDBBaseDAO implements ExecutionDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBExecutionDAO.class); + + private static final String WORKFLOW_FLAG_COLUMN = "flag"; + + // TODO change it back to INSERT after fixing conductor issue + private static final String CREATE_TASK_STATEMENT = + "UPSERT INTO task (workflow_instance_id,task_id,payload) VALUES (?,?,?)"; + private static final String UPSERT_TASK_STATEMENT = + "UPSERT INTO task (workflow_instance_id,task_id,payload) VALUES (?,?,?)"; + private static final String GET_TASK_BY_TASK_ID_STATEMENT = + "SELECT payload FROM task WHERE task_id = ?"; + private static final String GET_TASKS_BY_WORKFLOW_INSTANCE_ID_STATEMENT = + "SELECT payload FROM task WHERE workflow_instance_id = ?"; + private static final String REMOVE_TASK_STATEMENT = "DELETE FROM task WHERE task_id = ?"; + private static final String REMOVE_TASKS_STATEMENT = + "DELETE FROM task WHERE workflow_instance_id = ?"; + private static final String GET_RUNNING_TASK_IDS_BY_NAME_STATEMENT = + "SELECT task_id AS id FROM task WHERE task_name = ? AND status = 'IN_PROGRESS' "; + + private static final String CREATE_WORKFLOW_INSTANCE_STATEMENT = + "INSERT INTO workflow_instance (workflow_instance_id,payload) VALUES (?,?)"; + private static final String UPSERT_WORKFLOW_INSTANCE_STATEMENT = + "UPSERT INTO workflow_instance (workflow_instance_id,payload) VALUES (?,?)"; + private static final String REMOVE_WORKFLOW_INSTANCE_STATEMENT = + "DELETE FROM workflow_instance WHERE workflow_instance_id = ?"; + private static final String GET_WORKFLOW_INSTANCE_ONLY_STATEMENT = + "SELECT TRUE AS flag, payload FROM workflow_instance WHERE workflow_instance_id = ? AND workflow_instance_id = ?"; + private static final String GET_WORKFLOW_INSTANCE_WITH_TASKS_STATEMENT = + "SELECT TRUE AS flag, payload FROM workflow_instance WHERE workflow_instance_id = ? " + + "UNION ALL SELECT FALSE AS flag, payload FROM task WHERE workflow_instance_id = ?"; + + private static final String CREATE_EVENT_EXECUTION_STATEMENT = + "INSERT INTO event_execution (event,handler_name,message_id,execution_id,payload) VALUES (?,?,?,?,?)"; + private static final String UPSERT_EVENT_EXECUTION_STATEMENT = + "UPSERT INTO event_execution (event,handler_name,message_id,execution_id,payload) VALUES (?,?,?,?,?)"; + private static final String REMOVE_EVENT_EXECUTION_STATEMENT = + "DELETE FROM event_execution WHERE event = ? AND handler_name = ? AND message_id = ? AND execution_id = ?"; + + private final IndexDAO indexDAO; + private final int insertBatchSize; + private final int maxSearchSize; + + public CockroachDBExecutionDAO( + DataSource dataSource, + IndexDAO indexDAO, + ObjectMapper objectMapper, + CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + this.indexDAO = indexDAO; + this.insertBatchSize = config.getDbInsertBatchSize(); + this.maxSearchSize = config.getDbMaxSearchSize(); + } + + /** + * Get in processing tasks within a workflow with the given workflow instance id + * + * @param taskName: related task's TaskType + * @param workflowId: related workflow instance id + * @return a list of tasks with TaskType = taskName for a given workflow with WorkflowId = + * workflowId + */ + @Override + public List getPendingTasksByWorkflow(String taskName, String workflowId) { + List tasks = getTasksForWorkflow(workflowId); + return tasks.stream() + .filter(task -> taskName.equals(task.getTaskDefName())) + .filter(task -> Task.Status.IN_PROGRESS.equals(task.getStatus())) + .collect(Collectors.toList()); + } + + /** + * It is used in paginated get in-progress tasks HTTP or GRPC endpoint. + * + *

First, it gets the taskIds from {@link IndexDAO} using the secondary index with follower + * read mode. + * + *

Second, it gets tasks one by one (there is no transactional guarantee) with follower read + * mode. + */ + @Override + public List getTasks(String taskName, String startKey, int count) { + List taskIds = searchInProgressTaskIdsByName(taskName); + int startIdx = startKey == null ? 0 : taskIds.indexOf(startKey); + if (startIdx < 0) { + return Collections.emptyList(); + } + return taskIds.stream() + .skip(startIdx) + .limit(count) + .map(this::getReadOnlyTask) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private List searchInProgressTaskIdsByName(String taskDefName) { + return indexDAO.searchTasks(taskDefName, "IN_PROGRESS", 0, maxSearchSize, null).getResults(); + } + + /** + * Inserts new tasks for a single workflow into the CockroachDB. + * + * @param tasks tasks to be inserted to DB + * @return created tasks + */ + @Override + public List createTasks(List tasks) { + validateTasks(tasks); + Collection pendingTasks = + tasks.stream() + .collect( + Collectors.toMap(Task::getTaskId, Function.identity(), (u, v) -> v, TreeMap::new)) + .values(); + + return withMetricLogError( + () -> { + int cnt = + withRetryableStatement( + CREATE_TASK_STATEMENT, + statement -> { + int count = 0; + int inserted = 0; + for (Task task : pendingTasks) { + addTask(statement, task); + statement.addBatch(); + count++; + // batch INSERT with a batch size + if (count % insertBatchSize == 0 || count == pendingTasks.size()) { + int[] res = statement.executeBatch(); + inserted += res.length; + } + } + return inserted; + }); + LOG.debug( + "Created {}/{}/{} task instances for workflow {}", + cnt, + pendingTasks.size(), + tasks.size(), + tasks.get(0).getWorkflowInstanceId()); + return new ArrayList<>(pendingTasks); + }, + "createTasks", + "Fail creating {} task instances for workflow: {}", + tasks.size(), + tasks.get(0).getWorkflowInstanceId()); + } + + private void addTask(PreparedStatement statement, Task task) throws SQLException { + statement.setString(1, task.getWorkflowInstanceId()); + statement.setString(2, task.getTaskId()); + statement.setString(3, toJson(task)); + } + + @Override + public void updateTask(Task task) { + withMetricLogError( + () -> { + int cnt = + withRetryableTransaction( + conn -> { + int updated; + try (PreparedStatement statement = + conn.prepareStatement(UPSERT_TASK_STATEMENT)) { + addTask(statement, task); + updated = statement.executeUpdate(); + } + return updated; + }); + LOG.debug( + "updated {} task {} in a workflow instance {}", + cnt, + task.getTaskId(), + task.getWorkflowInstanceId()); + return cnt; + }, + "updateTask", + "Failed updating a task with id {} in a workflow instance {}", + task.getTaskId(), + task.getWorkflowInstanceId()); + } + + @Override + public boolean exceedsInProgressLimit(Task task) { + Optional taskDefinition = task.getTaskDefinition(); + if (!taskDefinition.isPresent()) { + return false; + } + int limit = taskDefinition.get().concurrencyLimit(); + if (limit <= 0) { + return false; + } + + try { + List taskIds = + withRetryableQuery( + GET_RUNNING_TASK_IDS_BY_NAME_STATEMENT, + statement -> statement.setString(1, task.getTaskDefName()), + result -> idsFromResult(result, -1)); + if (!taskIds.contains(task.getTaskId()) && taskIds.size() >= limit) { + LOG.info( + "Task execution count is limited. task - {}:{}, limit: {}, current: {}", + task.getTaskId(), + task.getTaskDefName(), + limit, + taskIds.size()); + return true; + } else { + LOG.debug( + "Task execution count is not limited. task - {}:{}, limit: {}, current: {}", + task.getTaskId(), + task.getTaskDefName(), + limit, + taskIds.size()); + return false; + } + } catch (Exception e) { + LOG.warn( + "Failed checking in progress limit for task - {}:{} due to {}", + task.getTaskId(), + task.getTaskDefName(), + e.getMessage()); + return true; + } + } + + @Override + public boolean removeTask(String taskId) { + return withRetryableStatement( + REMOVE_TASK_STATEMENT, + statement -> { + statement.setString(1, taskId); + if (statement.executeUpdate() == 1) { + return true; + } else { + LOG.warn("cannot remove a notfound task with id {}", taskId); + return false; + } + }); + } + + @Override + public Task getTask(String taskId) { + return withMetricLogError( + () -> + getPayload( + GET_TASK_BY_TASK_ID_STATEMENT, + statement -> statement.setString(1, taskId), + Task.class), + "getTask", + "Failed getting a task by id {}", + taskId); + } + + private Task getReadOnlyTask(String taskId) { + try { + return getReadOnlyPayload( + GET_TASK_BY_TASK_ID_STATEMENT, statement -> statement.setString(1, taskId), Task.class); + } catch (Exception e) { + LOG.warn("Failed getting a task by id {} due to {}", taskId, e.getMessage()); + return null; + } + } + + /** Best effort to get all Tasks for the given task ids. */ + @Override + public List getTasks(List taskIds) { + return taskIds.stream() + .map(this::getTask) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + /** + * It is used by admin and requeue API First, it gets the taskIds from {@link IndexDAO} using the + * secondary index with follower read mode. Second, it gets tasks one by one (there is no + * transactional guarantee) with follower read mode. + */ + @Override + public List getPendingTasksForTaskType(String taskType) { + return searchInProgressTaskIdsByName(taskType).stream() + .map(this::getReadOnlyTask) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + @Override + public List getTasksForWorkflow(String workflowId) { + List tasks = + withMetricLogError( + () -> + getPayloads( + GET_TASKS_BY_WORKFLOW_INSTANCE_ID_STATEMENT, + statement -> statement.setString(1, workflowId), + Task.class), + "getTasksForWorkflow", + "Failed getting tasks for workflow with id {}", + workflowId); + tasks.sort(Comparator.comparingInt(Task::getSeq)); + return tasks; + } + + @Override + public String createWorkflow(Workflow workflow) { + LOG.debug( + "Creating a workflow instance with the name {} and id: {}", + workflow.getWorkflowName(), + workflow.getWorkflowId()); + return createOrUpdateWorkflow(workflow, CREATE_WORKFLOW_INSTANCE_STATEMENT, "createWorkflow"); + } + + @Override + public String updateWorkflow(Workflow workflow) { + LOG.debug( + "Updating a workflow instance with the name {} and id: {}", + workflow.getWorkflowName(), + workflow.getWorkflowId()); + return createOrUpdateWorkflow(workflow, UPSERT_WORKFLOW_INSTANCE_STATEMENT, "updateWorkflow"); + } + + private String createOrUpdateWorkflow(Workflow workflow, String stmt, String methodName) { + validateWorkflow(workflow); + List tasks = workflow.getTasks(); + workflow.setTasks(Collections.emptyList()); + + withMetricLogError( + () -> + withRetryableUpdate( + stmt, + statement -> { + statement.setString(1, workflow.getWorkflowId()); + statement.setString(2, toJson(workflow)); + }), + methodName, + "Failed {} with workflow instance id {}", + methodName, + workflow.getWorkflowId()); + + workflow.setTasks(tasks); + return workflow.getWorkflowId(); + } + + @Override + public boolean removeWorkflow(String workflowId) { + LOG.debug("Removing a workflow instance with an id {}", workflowId); + return withMetricLogError( + () -> + withRetryableTransaction( + conn -> { + try (PreparedStatement wiStmt = + conn.prepareStatement(REMOVE_WORKFLOW_INSTANCE_STATEMENT); + PreparedStatement siStmt = conn.prepareStatement(REMOVE_TASKS_STATEMENT)) { + // remove associated tasks first + siStmt.setString(1, workflowId); + siStmt.executeUpdate(); + wiStmt.setString(1, workflowId); + if (wiStmt.executeUpdate() == 1) { + LOG.info("Removed a workflow instance with id {}", workflowId); + return true; + } else { + LOG.warn("Cannot remove a notfound workflow with id {}", workflowId); + return false; + } + } + }), + "removeWorkflow", + "Failed removing a workflow instance {}", + workflowId); + } + + /** + * Currently, this feature is not implemented for CockroachDBExecutionDAO. It is not used by + * conductor core and only used by Archiving Module + */ + @Override + public boolean removeWorkflowWithExpiry(String workflowId, int ttlSeconds) { + throw new UnsupportedOperationException( + "This method is not implemented in CockroachDBExecutionDAO."); + } + + @Override + public void removeFromPendingWorkflow(String workflowType, String workflowId) { + // no-op + } + + @Override + public Workflow getWorkflow(String workflowId) { + return getWorkflow(workflowId, true); + } + + @Override + public Workflow getWorkflow(String workflowId, boolean includeTasks) { + return getWorkflow( + workflowId, + includeTasks + ? GET_WORKFLOW_INSTANCE_WITH_TASKS_STATEMENT + : GET_WORKFLOW_INSTANCE_ONLY_STATEMENT); + } + + private Workflow getWorkflow(String workflowId, String getStatement) { + return withMetricLogError( + () -> + withRetryableQuery( + getStatement, + statement -> { + statement.setString(1, workflowId); + statement.setString(2, workflowId); + }, + this::workflowFromResult), + "getWorkflow", + "Failed getting a workflow instance {}", + workflowId); + } + + private Workflow workflowFromResult(ResultSet result) throws SQLException { + Workflow workflow = null; + List tasks = new ArrayList<>(); + while (result.next()) { + String payload = result.getString(PAYLOAD_COLUMN); + if (payload != null && !payload.isEmpty()) { + if (result.getBoolean(WORKFLOW_FLAG_COLUMN)) { + workflow = fromJson(payload, Workflow.class); + } else { + tasks.add(fromJson(payload, Task.class)); + } + } + } + if (workflow != null && !tasks.isEmpty()) { + tasks.sort(Comparator.comparingInt(Task::getSeq)); + workflow.setTasks(tasks); + } + return workflow; + } + + /** + * It is used by get all running workflow HTTP or GRPC endpoint. It gets the workflowIds from + * {@link IndexDAO} using the secondary index with follower read mode. + */ + @Override + public List getRunningWorkflowIds(String workflowName, int version) { + return searchRunningWorkflowIdsByName(workflowName, version); + } + + private List searchRunningWorkflowIdsByName(String workflowName, int version) { + return indexDAO + .searchWorkflows( + workflowName, + "RUNNING", + 0, + maxSearchSize, + Arrays.asList("DESC", String.valueOf(version))) + .getResults(); + } + + /** + * It is used to requeue pending tasks for all the running workflows. + * + *

First, it gets the workflowIds from {@link IndexDAO} using the secondary index with follower + * read mode. Second, it gets workflows one by one (there is no transactional guarantee) with + * follower read mode. + */ + @Override + public List getPendingWorkflowsByType(String workflowName, int version) { + return searchRunningWorkflowIdsByName(workflowName, version).stream() + .map( + workflowId -> + withMetricLogError( + () -> + withReadOnlyQuery( + GET_WORKFLOW_INSTANCE_WITH_TASKS_STATEMENT, + statement -> { + statement.setString(1, workflowId); + statement.setString(2, workflowId); + }, + this::workflowFromResult), + "getPendingWorkflowsByType", + "Failed getting a workflow instance with id {}", + workflowId)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + /** + * It is used to get the pending workflow count metric for WorkflowMonitor. It gets the count from + * {@link IndexDAO} using the secondary index with follower read mode. + */ + @Override + public long getPendingWorkflowCount(String workflowName) { + return searchRunningWorkflowIdsByName(workflowName, -1).size(); + } + + /** + * It is used to get the pending task count metric for WorkflowMonitor. It gets the count from + * {@link IndexDAO} using the secondary index with follower read mode. + */ + @Override + public long getInProgressTaskCount(String taskDefName) { + return searchInProgressTaskIdsByName(taskDefName).size(); + } + + /** + * It is used by get all running workflow HTTP or GRPC endpoint. + * + *

Assuming that its running workflow instance number is reasonable, startTime filter is + * applied after getting all running workflow instances for a given workflow name. + */ + @Override + public List getWorkflowsByType(String workflowName, Long startTime, Long endTime) { + return getPendingWorkflowsByType(workflowName, -1).stream() + .filter( + workflow -> workflow.getStartTime() >= startTime && workflow.getStartTime() < endTime) + .collect(Collectors.toList()); + } + + /** + * Currently, this feature is not implemented for CockroachDBExecutionDAO. It is used to list + * workflows for the given correlation id in HTTP or GRPC endpoint. If needed, add correlation id + * to cockroachdb table schema and implement it. + */ + @Override + public List getWorkflowsByCorrelationId( + String workflowName, String correlationId, boolean includeTasks) { + throw new UnsupportedOperationException( + "This method is not implemented in CockroachDBExecutionDAO."); + } + + /** return true if getWorkflowsByCorrelationId is implemented. */ + @Override + public boolean canSearchAcrossWorkflows() { + return false; + } + + @Override + public boolean addEventExecution(EventExecution eventExecution) { + LOG.debug( + "Creating an event execution for event {} with handler name {} and execution id {}", + eventExecution.getEvent(), + eventExecution.getName(), + eventExecution.getId()); + return addOrUpdateEventExecution( + CREATE_EVENT_EXECUTION_STATEMENT, eventExecution, "addEventExecution"); + } + + @Override + public void updateEventExecution(EventExecution eventExecution) { + LOG.debug( + "Updating an event execution for event {} with handler name {} and execution id {}", + eventExecution.getEvent(), + eventExecution.getName(), + eventExecution.getId()); + addOrUpdateEventExecution( + UPSERT_EVENT_EXECUTION_STATEMENT, eventExecution, "updateEventExecution"); + } + + private boolean addOrUpdateEventExecution( + String stmt, EventExecution eventExecution, String methodName) { + try { + int cnt = + withRetryableUpdate( + stmt, + statement -> { + statement.setString(1, eventExecution.getEvent()); + statement.setString(2, eventExecution.getName()); + statement.setString(3, eventExecution.getMessageId()); + statement.setString(4, eventExecution.getId()); + statement.setString(5, toJson(eventExecution)); + }); + return cnt == 1; + } catch (Exception e) { + LOG.warn( + "Failed {} for event {} with handler name {} due to {}", + methodName, + eventExecution.getEvent(), + eventExecution.getName(), + e.getMessage()); + // Best effort to store the execution event + return false; + } + } + + @Override + public void removeEventExecution(EventExecution eventExecution) { + LOG.debug( + "Removing an event execution for event {} with handler name {} and execution id {}", + eventExecution.getEvent(), + eventExecution.getName(), + eventExecution.getId()); + withMetricLogError( + () -> + withRetryableUpdate( + REMOVE_EVENT_EXECUTION_STATEMENT, + statement -> { + statement.setString(1, eventExecution.getEvent()); + statement.setString(2, eventExecution.getName()); + statement.setString(3, eventExecution.getMessageId()); + statement.setString(4, eventExecution.getId()); + }), + "removeEventExecution", + "Failed removing an event execution for event {} with handler name {}", + eventExecution.getEvent(), + eventExecution.getName()); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAO.java new file mode 100644 index 0000000..b429702 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAO.java @@ -0,0 +1,342 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.IndexDAO; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB implementation of Index DAO to index the workflow and task details for searching. + * + *

It directly search data in CockroachDB workflow and task tables using the secondary index. It + * uses CockroachDB follower-read mechanism to reduce contention because the search results can be + * slightly less than current. + * + * @author jun-he + */ +public class CockroachDBIndexDAO extends CockroachDBBaseDAO implements IndexDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBIndexDAO.class); + + private static final String CREATED_TIME_COLUMN = "created_time"; + private static final String LOG_COLUMN = "log"; + + private static final String GET_WORKFLOW_INSTANCE_IDS_STATEMENT_TEMPLATE = + "SELECT workflow_instance_id AS id, workflow_version AS version FROM workflow_instance " + + "WHERE workflow_name = ? AND status = ? ORDER BY start_time %s LIMIT %s OFFSET %s"; // offset is not good + + private static final String GET_TASK_IDS_STATEMENT = + "SELECT task_id AS id FROM task " + + "WHERE task_name = ? AND status = ? ORDER BY start_time %s LIMIT %s OFFSET %s"; + + private static final String CREATE_TASK_EXECUTION_LOGS_STATEMENT = + "INSERT INTO task_execution_log (task_id,created_time,log) VALUES (?,?,?)"; + private static final String GET_TASK_EXECUTION_LOGS_STATEMENT = + "SELECT task_id AS id, created_time, log FROM task_execution_log " + + "WHERE task_id = ? ORDER BY created_time"; + + private static final String GET_EVENT_EXECUTIONS_STATEMENT = + "SELECT payload FROM event_execution WHERE event = ?"; + + public CockroachDBIndexDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + @Override + public void setup() { + // no-op + } + + @Override + public void indexWorkflow(Workflow workflow) { + // no-op + } + + @Override + public CompletableFuture asyncIndexWorkflow(Workflow workflow) { + // no-op + return CompletableFuture.completedFuture(null); + } + + @Override + public void indexTask(Task task) { + // no-op + } + + @Override + public CompletableFuture asyncIndexTask(Task task) { + // no-op + return CompletableFuture.completedFuture(null); + } + + /** + * It searches workflow instance ids in CockroachDB workflow_instance table using the secondary + * index. Note that it does not support free text search and re-define the meaning of the + * parameters. + * + * @param workflowName workflow name / workflow type + * @param status workflow status + * @param start select offset + * @param count selection limit + * @param options other search options, [0] is startTime sorting order, [1] is version, null means + * using default + * @return a list of workflow instance ids wrapped in SearchResult + */ + @Override + public SearchResult searchWorkflows( + String workflowName, String status, int start, int count, List options) { + return withMetricLogError( + () -> + getSearchIds( + GET_WORKFLOW_INSTANCE_IDS_STATEMENT_TEMPLATE, + workflowName, + status, + start, + count, + options), + "searchWorkflows", + "Failed searching workflows by workflow name {} with status {}", + workflowName, + status); + } + + private SearchResult getSearchIds( + String stmtTemplate, String name, String status, int start, int count, List options) { + final String sorted = options == null || options.isEmpty() ? "DESC" : options.get(0); + final String getIdsStatement = String.format(stmtTemplate, sorted, count, start); + final int version = + options == null || options.size() < 2 ? -1 : Integer.parseInt(options.get(1)); + List ids = + withReadOnlyQuery( + getIdsStatement, + statement -> { + statement.setString(1, name); + statement.setString(2, status); + }, + rs -> idsFromResult(rs, version)); + return new SearchResult<>(ids.size(), ids); + } + + /** + * It searches task ids in CockroachDB task table using the secondary index. Note that it does not + * support free text search and re-define the meaning of the parameters. + * + * @param taskName task definition name + * @param status task status + * @param start select offset + * @param count selection limit + * @param options other search options, [0] is startTime sorting order, null means using default + * @return a list of task ids wrapped in SearchResult + */ + @Override + public SearchResult searchTasks( + String taskName, String status, int start, int count, List options) { + return withMetricLogError( + () -> getSearchIds(GET_TASK_IDS_STATEMENT, taskName, status, start, count, options), + "searchTasks", + "Failed searching tasks by task name {} with status {}", + taskName, + status); + } + + @Override + public void removeWorkflow(String workflowId) { + // no-op + } + + @Override + public CompletableFuture asyncRemoveWorkflow(String workflowId) { + // no-op + return CompletableFuture.completedFuture(null); + } + + @Override + public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { + // no-op + } + + @Override + public CompletableFuture asyncUpdateWorkflow( + String workflowInstanceId, String[] keys, Object[] values) { + // no-op + return CompletableFuture.completedFuture(null); + } + + /** Always return null */ + @Override + public String get(String workflowInstanceId, String key) { + return null; + } + + /** Assume the number of logs is reasonable and fit into 1 batch */ + @Override + public void addTaskExecutionLogs(List logs) { + if (logs == null || logs.isEmpty()) { + return; + } + + Set taskLogs = new HashSet<>(logs); + try { + int cnt = + withRetryableStatement( + CREATE_TASK_EXECUTION_LOGS_STATEMENT, + statement -> { + for (TaskExecLog taskLog : taskLogs) { + statement.setString(1, taskLog.getTaskId()); + statement.setLong(2, taskLog.getCreatedTime()); + statement.setString(3, taskLog.getLog()); + statement.addBatch(); + } + int[] res = statement.executeBatch(); + return res.length; + }); + LOG.debug( + "Created {}/{} task execution logs for a task with id {}", + cnt, + logs.size(), + logs.get(0).getTaskId()); + } catch (Exception e) { + LOG.warn( + "Fail creating {} task execution logs for a task with id {} due to {}", + logs.size(), + logs.get(0).getTaskId(), + e.getMessage()); + // ignore error as the execution log is not in the critical path. + } + } + + @Override + public CompletableFuture asyncAddTaskExecutionLogs(List logs) { + return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs)); + } + + @Override + public List getTaskExecutionLogs(String taskId) { + try { + return withReadOnlyQuery( + GET_TASK_EXECUTION_LOGS_STATEMENT, + statement -> statement.setString(1, taskId), + this::taskExecLogsFromResult); + } catch (Exception e) { + LOG.warn( + "Cannot get task execution logs for task {} due to message = [{}]", + taskId, + e.getMessage()); + // ignore error as the execution log is not in the critical path. + return Collections.emptyList(); + } + } + + private List taskExecLogsFromResult(ResultSet result) throws SQLException { + List logs = new ArrayList<>(); + while (result.next()) { + String id = result.getString(ID_COLUMN); + long createdTime = result.getLong(CREATED_TIME_COLUMN); + String log = result.getString(LOG_COLUMN); + TaskExecLog taskLog = new TaskExecLog(); + taskLog.setTaskId(id); + taskLog.setCreatedTime(createdTime); + taskLog.setLog(log); + logs.add(taskLog); + } + return logs; + } + + @Override + public void addEventExecution(EventExecution eventExecution) { + // no-op + } + + @Override + public List getEventExecutions(String event) { + return withMetricLogError( + () -> + getPayloads( + GET_EVENT_EXECUTIONS_STATEMENT, + statement -> statement.setString(1, event), + EventExecution.class), + "getEventExecutions", + "Failed getting event executions with event {}", + event); + } + + @Override + public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) { + // no-op + return CompletableFuture.completedFuture(null); + } + + /** + * Message has never been used or retrieved and just for debugging purpose So just output the msg + * to the log. Persist it if needed. + */ + @Override + public void addMessage(String queue, Message msg) { + LOG.info("For queue {}, add a message: {}", queue, toJson(msg)); + } + + @Override + public CompletableFuture asyncAddMessage(String queue, Message message) { + addMessage(queue, message); + return CompletableFuture.completedFuture(null); + } + + /** + * Currently, this feature is not implemented for CockroachDBIndexDAO. It is used for debugging + * purpose only. + */ + @Override + public List getMessages(String queue) { + throw new UnsupportedOperationException( + "This method is not implemented in CockroachDBIndexDAO."); + } + + /** + * Currently, this feature is not implemented for CockroachDBIndexDAO. It is used for debugging + * purpose or a workflow archival service. + */ + @Override + public List searchArchivableWorkflows(String indexName, long archiveTtlDays) { + throw new UnsupportedOperationException( + "This method is not implemented in CockroachDBIndexDAO."); + } + + /** + * Currently, this feature is not implemented for CockroachDBIndexDAO. It is used for debugging + * purpose or a stale workflow completion service. + */ + @Override + public List searchRecentRunningWorkflows( + int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) { + throw new UnsupportedOperationException( + "This method is not implemented in CockroachDBIndexDAO."); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAO.java new file mode 100644 index 0000000..0dd2955 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAO.java @@ -0,0 +1,234 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.cockroachdb.util.StatementPreparer; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.MetadataDAO; +import java.util.List; +import java.util.Optional; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB implementation of MetadataDAO. + * + *

It manages workflow and task definition metadata. TODO add local cache if needed + * + * @author jun-he + */ +public class CockroachDBMetadataDAO extends CockroachDBBaseDAO implements MetadataDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBMetadataDAO.class); + + private static final String UPSERT_TASK_DEFINITION_STATEMENT = + "UPSERT INTO task_definition (task_name,payload) VALUES (?,?)"; + private static final String GET_TASK_DEFINITION_STATEMENT = + "SELECT payload FROM task_definition where task_name = ?"; + private static final String GET_TASK_DEFINITIONS_STATEMENT = + "SELECT payload FROM task_definition"; + private static final String REMOVE_TASK_DEFINITIONS_STATEMENT = + "DELETE FROM task_definition WHERE task_name = ?"; + + private static final String CREATE_WORKFLOW_DEFINITION_STATEMENT = + "INSERT INTO workflow_definition (workflow_name,version,payload) VALUES (?,?,?)"; + private static final String UPSERT_WORKFLOW_DEFINITION_STATEMENT = + "UPSERT INTO workflow_definition (workflow_name,version,payload) VALUES (?,?,?)"; + private static final String GET_WORKFLOW_DEFINITION_STATEMENT = + "SELECT payload FROM workflow_definition where workflow_name = ? and version = ?"; + private static final String GET_LATEST_WORKFLOW_DEFINITION_STATEMENT = + "SELECT payload FROM workflow_definition where workflow_name = ? order by version DESC LIMIT 1"; + private static final String GET_WORKFLOW_DEFINITIONS_STATEMENT = + "SELECT payload FROM workflow_definition"; + private static final String REMOVE_WORKFLOW_DEFINITIONS_STATEMENT = + "DELETE FROM workflow_definition WHERE workflow_name = ? and version = ?"; + + public CockroachDBMetadataDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + @Override + public void createTaskDef(TaskDef taskDef) { + LOG.info("Creating a task definition: {}", taskDef); + createOrUpdateTaskDefinition(taskDef, "createTaskDef"); + } + + @Override + public String updateTaskDef(TaskDef taskDef) { + LOG.info("Updating a task definition with the name: {}", taskDef.getName()); + return createOrUpdateTaskDefinition(taskDef, "updateTaskDef"); + } + + private String createOrUpdateTaskDefinition(TaskDef taskDef, String methodName) { + withMetricLogError( + () -> + withRetryableUpdate( + UPSERT_TASK_DEFINITION_STATEMENT, + statement -> { + statement.setString(1, taskDef.getName()); + statement.setString(2, toJson(taskDef)); + }), + methodName, + "Failed {} with task definition name {}", + methodName, + taskDef.getName()); + return taskDef.getName(); + } + + @Override + public TaskDef getTaskDef(String name) { + return withMetricLogError( + () -> + getPayload( + GET_TASK_DEFINITION_STATEMENT, + statement -> statement.setString(1, name), + TaskDef.class), + "getTaskDef", + "Failed getting a task definition with name {}", + name); + } + + @Override + public List getAllTaskDefs() { + return withMetricLogError( + () -> getPayloads(GET_TASK_DEFINITIONS_STATEMENT, StatementPreparer.NO_OP, TaskDef.class), + "getAllTaskDefs", + "Failed getting all task definitions"); + } + + @Override + public void removeTaskDef(String name) { + LOG.info("Removing a task definition with name {}", name); + withMetricLogError( + () -> { + int cnt = + withRetryableUpdate( + REMOVE_TASK_DEFINITIONS_STATEMENT, statement -> statement.setString(1, name)); + if (cnt != 1) { + throw new ApplicationException( + ApplicationException.Code.NOT_FOUND, + "Cannot remove the task - no such task definition with name " + name); + } + return cnt; + }, + "removeTaskDef", + "Failed removing a task definition with name {}", + name); + } + + @Override + public void createWorkflowDef(WorkflowDef def) { + LOG.info("Creating a workflow definition: {}", def); + createOrUpdateWorkflowDefinition( + CREATE_WORKFLOW_DEFINITION_STATEMENT, def, "createWorkflowDef"); + } + + @Override + public void updateWorkflowDef(WorkflowDef def) { + LOG.info("Updating a workflow definition with the name: {}", def.getName()); + createOrUpdateWorkflowDefinition( + UPSERT_WORKFLOW_DEFINITION_STATEMENT, def, "updateWorkflowDef"); + } + + private void createOrUpdateWorkflowDefinition( + String stmt, WorkflowDef workflowDef, String methodName) { + withMetricLogError( + () -> + withRetryableUpdate( + stmt, + statement -> { + statement.setString(1, workflowDef.getName()); + statement.setInt(2, workflowDef.getVersion()); + statement.setString(3, toJson(workflowDef)); + }), + methodName, + "Failed {} with workflow definition name {} and version {}", + methodName, + workflowDef.getName(), + workflowDef.getVersion()); + } + + @Override + public Optional getLatestWorkflowDef(String name) { + return Optional.ofNullable( + withMetricLogError( + () -> + getPayload( + GET_LATEST_WORKFLOW_DEFINITION_STATEMENT, + statement -> statement.setString(1, name), + WorkflowDef.class), + "getLatestWorkflowDef", + "Failed getting the latest version of workflow definition with name {}", + name)); + } + + @Override + public Optional getWorkflowDef(String name, int version) { + return Optional.ofNullable( + withMetricLogError( + () -> + getPayload( + GET_WORKFLOW_DEFINITION_STATEMENT, + statement -> { + statement.setString(1, name); + statement.setInt(2, version); + }, + WorkflowDef.class), + "getWorkflowDef", + "Failed getting a workflow definition with name {} and version {}", + name, + version)); + } + + @Override + public void removeWorkflowDef(String name, Integer version) { + LOG.info("Removing a workflow definition with name {} and version {}", name, version); + withMetricLogError( + () -> { + int cnt = + withRetryableUpdate( + REMOVE_WORKFLOW_DEFINITIONS_STATEMENT, + statement -> { + statement.setString(1, name); + statement.setInt(2, version); + }); + if (cnt != 1) { + throw new ApplicationException( + ApplicationException.Code.NOT_FOUND, + String.format( + "Cannot remove the workflow - no such workflow definition with name %s and version %s", + name, version)); + } + return cnt; + }, + "removeWorkflowDef", + "Failed removing a workflow definition with name {} and version {}", + name, + version); + } + + @Override + public List getAllWorkflowDefs() { + return withMetricLogError( + () -> + getPayloads( + GET_WORKFLOW_DEFINITIONS_STATEMENT, StatementPreparer.NO_OP, WorkflowDef.class), + "getAllWorkflowDefs", + "Failed getting all workflow definitions"); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAO.java new file mode 100644 index 0000000..82f4aef --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAO.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.common.metadata.tasks.PollData; +import com.netflix.conductor.dao.PollDataDAO; +import java.util.List; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CockroachDB implementation of PollDataDAO. + * + * @author jun-he + */ +public class CockroachDBPollDataDAO extends CockroachDBBaseDAO implements PollDataDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBPollDataDAO.class); + private static final String DEFAULT_DOMAIN = "DEFAULT"; + + private static final String UPSERT_POLL_DATA_STATEMENT = + "UPSERT INTO poll_data (queue_name,domain,payload) VALUES (?,?,?)"; + private static final String GET_POLL_DATA_WITH_DOMAIN_STATEMENT = + "SELECT payload FROM poll_data WHERE queue_name = ? AND domain = ?"; + private static final String GET_POLL_DATA_FOR_QUEUE_STATEMENT = + "SELECT payload FROM poll_data WHERE queue_name = ?"; + + public CockroachDBPollDataDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + @Override + public void updateLastPollData(String taskDefName, String domain, String workerId) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + PollData pollData = new PollData(taskDefName, domain, workerId, System.currentTimeMillis()); + String actualDomain = (domain == null) ? DEFAULT_DOMAIN : domain; + Integer cnt = + withMetricLogError( + () -> + withRetryableUpdate( + UPSERT_POLL_DATA_STATEMENT, + statement -> { + statement.setString(1, pollData.getQueueName()); + statement.setString(2, actualDomain); + statement.setString(3, toJson(pollData)); + }), + "updateLastPollData", + "Failed updating last poll data {}", + pollData); + LOG.debug("Updated {} last poll data: {}", cnt, pollData); + } + + @Override + public PollData getPollData(String taskDefName, String domain) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + String actualDomain = (domain == null) ? DEFAULT_DOMAIN : domain; + return withMetricLogError( + () -> + getPayload( + GET_POLL_DATA_WITH_DOMAIN_STATEMENT, + statement -> { + statement.setString(1, taskDefName); + statement.setString(2, actualDomain); + }, + PollData.class), + "getPollData", + "Failed getting last poll data with queue {} and domain {}", + taskDefName, + actualDomain); + } + + @Override + public List getPollData(String taskDefName) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + return withMetricLogError( + () -> + getPayloads( + GET_POLL_DATA_FOR_QUEUE_STATEMENT, + statement -> statement.setString(1, taskDefName), + PollData.class), + "getPollData", + "Failed getting last poll data with queue {}", + taskDefName); + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBQueueDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBQueueDAO.java new file mode 100644 index 0000000..015d850 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBQueueDAO.java @@ -0,0 +1,119 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.dao.QueueDAO; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; + +/** + * Dummy CockroachDB implementation of QueueDAO. + * + * @author jun-he + */ +public class CockroachDBQueueDAO extends CockroachDBBaseDAO implements QueueDAO { + public CockroachDBQueueDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + @Override + public void push(String queueName, String id, long offsetTimeInSecond) {} + + @Override + public void push(String queueName, String id, int priority, long offsetTimeInSecond) {} + + @Override + public void push(String queueName, List messages) {} + + @Override + public boolean pushIfNotExists(String queueName, String id, long offsetTimeInSecond) { + return false; + } + + @Override + public boolean pushIfNotExists( + String queueName, String id, int priority, long offsetTimeInSecond) { + return false; + } + + @Override + public List pop(String queueName, int count, int timeout) { + return null; + } + + @Override + public List pollMessages(String queueName, int count, int timeout) { + return null; + } + + @Override + public void remove(String queueName, String messageId) {} + + @Override + public int getSize(String queueName) { + return 0; + } + + @Override + public boolean ack(String queueName, String messageId) { + return false; + } + + @Override + public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) { + return false; + } + + @Override + public void flush(String queueName) {} + + @Override + public Map queuesDetail() { + return null; + } + + @Override + public Map>> queuesDetailVerbose() { + return null; + } + + @Override + public boolean resetOffsetTime(String queueName, String id) { + return false; + } + + @Override + public List pop(String queueName, int count, int timeout, long leaseDurationSeconds) { + return null; + } + + @Override + public List pollMessages( + String queueName, int count, int timeout, long leaseDurationSeconds) { + return null; + } + + @Override + public void processUnacks(String queueName) {} + + @Override + public boolean postpone( + String queueName, String messageId, int priority, long postponeDurationInSeconds) { + return false; + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAO.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAO.java new file mode 100644 index 0000000..555e7ae --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAO.java @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.dao.RateLimitingDAO; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CockroachDBRateLimitingDAO extends CockroachDBBaseDAO implements RateLimitingDAO { + private static final Logger LOG = LoggerFactory.getLogger(CockroachDBRateLimitingDAO.class); + + private static final String COUNT_COLUMN = "cnt"; + + private static final String GET_RUNNING_TASK_COUNT_BY_NAME_STATEMENT = + "SELECT count(task_name) AS cnt FROM task " + + "WHERE task_name = ? AND status = 'IN_PROGRESS' AND start_time >= ? "; + + public CockroachDBRateLimitingDAO( + DataSource dataSource, ObjectMapper objectMapper, CockroachDBConfiguration config) { + super(dataSource, objectMapper, config); + } + + /** + * Evaluate if the {@link Task} is rate limited or not based on {@link TaskDef} or {@link Task}. + * + *

It search running tasks in CockroachDB task table using the secondary index. + * + * @param task task to be evaluated whether it is rateLimited or not + * @param taskDef task definition with rate limit settings + * @return true if the {@link Task} is rateLimited, otherwise false. + */ + @Override + public boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef) { + int rateLimit = + taskDef == null ? task.getRateLimitPerFrequency() : taskDef.getRateLimitPerFrequency(); + if (rateLimit <= 0) { + return false; + } + int bucketSize = + taskDef == null + ? task.getRateLimitFrequencyInSeconds() + : taskDef.getRateLimitFrequencyInSeconds(); + String taskName = task.getTaskDefName(); + try { + return withRetryableQuery( + GET_RUNNING_TASK_COUNT_BY_NAME_STATEMENT, + statement -> { + statement.setString(1, taskName); + statement.setLong(2, System.currentTimeMillis() - 1000 * bucketSize); + }, + result -> { + if (result.next()) { + int cnt = result.getInt(COUNT_COLUMN); + if (cnt > rateLimit) { + LOG.info( + "Got {} running instance for the task name {} in the past {} second exceeding a limit {}", + cnt, + taskName, + bucketSize, + rateLimit); + return true; + } else { + LOG.debug( + "Got {} running instance for the task name {} in the past {} second within a limit {}", + cnt, + taskName, + bucketSize, + rateLimit); + } + } + return false; + }); + } catch (Exception e) { + LOG.warn("Failed checking rate limit for task {} due to {}", taskName, e.getMessage()); + return true; + } + } +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/ResultProcessor.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/ResultProcessor.java new file mode 100644 index 0000000..1f77166 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/ResultProcessor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.util; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Functional interface for processing result set. + * + * @author jun-he + */ +@FunctionalInterface +public interface ResultProcessor { + R process(ResultSet result) throws SQLException; +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementFunction.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementFunction.java new file mode 100644 index 0000000..e6d2958 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementFunction.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.util; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Functional interface for operations within a SQL statement. + * + * @author jun-he + */ +@FunctionalInterface +public interface StatementFunction { + R apply(PreparedStatement stmt) throws SQLException; +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementPreparer.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementPreparer.java new file mode 100644 index 0000000..e6368b5 --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/StatementPreparer.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.util; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Functional interface for a prepared statement. + * + * @author jun-he + */ +@FunctionalInterface +public interface StatementPreparer { + StatementPreparer NO_OP = stmt -> {}; + + void prepare(PreparedStatement stmt) throws SQLException; +} diff --git a/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/TransactionalFunction.java b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/TransactionalFunction.java new file mode 100644 index 0000000..6c60d2a --- /dev/null +++ b/cockroachdb-persistence/src/main/java/com/netflix/conductor/cockroachdb/util/TransactionalFunction.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.util; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * Functional interface for operations within a transactional context. + * + * @author mustafa + */ +@FunctionalInterface +public interface TransactionalFunction { + R apply(Connection tx) throws SQLException; +} diff --git a/cockroachdb-persistence/src/main/resources/db/migration/V1__initial_schema.sql b/cockroachdb-persistence/src/main/resources/db/migration/V1__initial_schema.sql new file mode 100644 index 0000000..d671bd7 --- /dev/null +++ b/cockroachdb-persistence/src/main/resources/db/migration/V1__initial_schema.sql @@ -0,0 +1,75 @@ +-- -------------------------------------------------------------------------------------------------------------- +-- SCHEMA FOR EXECUTION RELATED DAOs +-- -------------------------------------------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS workflow_instance ( + workflow_instance_id STRING NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (workflow_instance_id), + workflow_name STRING AS (payload->>'workflowName') STORED, + status STRING AS (payload->>'status') STORED, + start_time INT8 AS ((payload->>'startTime')::INT8) STORED, + workflow_version INT4 AS ((payload->>'workflowVersion')::INT4) STORED, + INDEX name_status_index (workflow_name,status,start_time) +); + +CREATE TABLE IF NOT EXISTS task ( + workflow_instance_id STRING NOT NULL, + task_id STRING NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (workflow_instance_id, task_id), + INDEX id_index (task_id), + task_name STRING AS (payload->>'taskDefName') STORED, + status STRING AS (payload->>'status') STORED, + start_time INT8 AS ((payload->>'startTime')::INT8) STORED, + INDEX name_status_index (task_name,status,start_time) +); + +CREATE TABLE IF NOT EXISTS event_execution ( + event STRING NOT NULL, + handler_name STRING NOT NULL, + message_id STRING NOT NULL, + execution_id STRING NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (event,handler_name,message_id,execution_id) +); + +CREATE TABLE IF NOT EXISTS task_execution_log ( + task_id STRING NOT NULL, + created_time INT8, + hash_code INT8 AS (fnv32a(log)) STORED, + log STRING, + PRIMARY KEY (task_id,created_time,hash_code) +); + +CREATE TABLE IF NOT EXISTS poll_data ( + queue_name STRING NOT NULL, + domain STRING NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (queue_name,domain) +); + + +-- -------------------------------------------------------------------------------------------------------------- +-- SCHEMA FOR METADATA RELATED DAOs +-- -------------------------------------------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS event_handler ( + handler_name STRING, + payload JSONB NOT NULL, + PRIMARY KEY (handler_name), + event STRING AS (payload->>'event') STORED, + active BOOL AS ((payload->>'active')::BOOL) STORED, + INDEX event_index (event,active) +); + +CREATE TABLE IF NOT EXISTS workflow_definition ( + workflow_name STRING NOT NULL, + version INT4 NOT NULL, + payload STRING NOT NULL, + PRIMARY KEY (workflow_name,version DESC) +); + +CREATE TABLE IF NOT EXISTS task_definition ( + task_name STRING NOT NULL, + payload STRING NOT NULL, + PRIMARY KEY (task_name) +); diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/CockroachDBTestConfiguration.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/CockroachDBTestConfiguration.java new file mode 100644 index 0000000..301b368 --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/CockroachDBTestConfiguration.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb; + +import com.netflix.conductor.core.config.SystemPropertiesConfiguration; + +public class CockroachDBTestConfiguration extends SystemPropertiesConfiguration + implements CockroachDBConfiguration { + @Override + public boolean isFollowerReadsEnabled() { + return false; + } + + @Override + public String getJdbcUrl() { + return "jdbc:tc:cockroach:///maestro"; + } + + @Override + public int getConnectionPoolMaxSize() { + return 2; + } + + @Override + public int getConnectionPoolMinIdle() { + return getConnectionPoolMaxSize(); + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseTest.java new file mode 100644 index 0000000..45a8504 --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBBaseTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.cockroachdb.CockroachDBConfiguration; +import com.netflix.conductor.cockroachdb.CockroachDBDataSourceProvider; +import com.netflix.conductor.cockroachdb.CockroachDBTestConfiguration; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.utils.JsonMapperProvider; +import javax.sql.DataSource; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class CockroachDBBaseTest { + static CockroachDBConfiguration config; + static DataSource dataSource; + static ObjectMapper objectMapper; + + static final String TEST_WORKFLOW_ID = "test-workflow"; + static final String TEST_WORKFLOW_NAME = TEST_WORKFLOW_ID + "name"; + static final String TEST_TASK_DEF_NAME = "test-task-def"; + static final String TEST_TASK_ID_1 = "test-task-1"; + static final String TEST_TASK_ID_2 = "test-task-2"; + + @BeforeClass + public static void init() { + config = new CockroachDBTestConfiguration(); + dataSource = new CockroachDBDataSourceProvider(config).get(); + objectMapper = new JsonMapperProvider().get(); + } + + @AfterClass + public static void destroy() {} + + Workflow createTestWorkflow(String workflowId) { + WorkflowDef def = new WorkflowDef(); + def.setName(workflowId + "name"); + Workflow workflow = new Workflow(); + workflow.setWorkflowId(workflowId); + workflow.setWorkflowDefinition(def); + workflow.setStartTime(System.currentTimeMillis()); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + return workflow; + } + + TaskDef createTaskDef(int rateLimit, int concurLimit) { + TaskDef taskDef = new TaskDef(TEST_TASK_DEF_NAME); + taskDef.setRateLimitFrequencyInSeconds(60); + taskDef.setRateLimitPerFrequency(rateLimit); + taskDef.setConcurrentExecLimit(concurLimit); + return taskDef; + } + + Task createRunningTestTask(String taskId) { + Task task = new Task(); + task.setTaskId(taskId); + task.setTaskDefName(TEST_TASK_DEF_NAME); + task.setStatus(Task.Status.IN_PROGRESS); + task.setStartTime(System.currentTimeMillis()); + task.setWorkflowInstanceId(TEST_WORKFLOW_ID); + return task; + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAOTest.java new file mode 100644 index 0000000..a6a362f --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBEventHandlerDAOTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.*; + +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.dao.EventHandlerDAO; +import java.util.List; +import java.util.UUID; +import org.junit.Before; +import org.junit.Test; + +public class CockroachDBEventHandlerDAOTest extends CockroachDBBaseTest { + + private EventHandlerDAO dao; + + @Before + public void setUp() { + dao = new CockroachDBEventHandlerDAO(dataSource, objectMapper, config); + } + + @Test + public void testEventHandlers() { + String event1 = "SQS::arn:account090:sqstest1"; + String event2 = "SQS::arn:account090:sqstest2"; + + EventHandler eh = new EventHandler(); + eh.setName(UUID.randomUUID().toString()); + eh.setActive(false); + EventHandler.Action action = new EventHandler.Action(); + action.setAction(EventHandler.Action.Type.start_workflow); + action.setStart_workflow(new EventHandler.StartWorkflow()); + action.getStart_workflow().setName("workflow_x"); + eh.getActions().add(action); + eh.setEvent(event1); + + dao.addEventHandler(eh); + List all = dao.getAllEventHandlers(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals(eh.getName(), all.get(0).getName()); + assertEquals(eh.getEvent(), all.get(0).getEvent()); + + List byEvents = dao.getEventHandlersForEvent(event1, true); + assertNotNull(byEvents); + assertEquals(0, byEvents.size()); // event is marked as in-active + + eh.setActive(true); + eh.setEvent(event2); + dao.updateEventHandler(eh); + + all = dao.getAllEventHandlers(); + assertNotNull(all); + assertEquals(1, all.size()); + + byEvents = dao.getEventHandlersForEvent(event1, true); + assertNotNull(byEvents); + assertEquals(0, byEvents.size()); + + byEvents = dao.getEventHandlersForEvent(event2, true); + assertNotNull(byEvents); + assertEquals(1, byEvents.size()); + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAOTest.java new file mode 100644 index 0000000..65f1dba --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBExecutionDAOTest.java @@ -0,0 +1,482 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.dao.ExecutionDAO; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class CockroachDBExecutionDAOTest extends CockroachDBBaseTest { + private ExecutionDAO dao; + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() { + dao = + new CockroachDBExecutionDAO( + dataSource, + new CockroachDBIndexDAO(dataSource, objectMapper, config), + objectMapper, + config); + dao.createWorkflow(createTestWorkflow(TEST_WORKFLOW_ID)); + } + + @After + public void tearDown() { + dao.removeWorkflow(TEST_WORKFLOW_ID); + } + + public ExecutionDAO getExecutionDAO() { + return dao; + } + + @Test + public void testTaskExceedsLimit() { + TaskDef taskDefinition = createTaskDef(0, 1); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("task1"); + workflowTask.setTaskDefinition(taskDefinition); + + List tasks = new LinkedList<>(); + for (int i = 0; i < 15; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(i + 1); + task.setTaskId("t_" + i); + task.setWorkflowInstanceId(TEST_WORKFLOW_ID); + task.setReferenceTaskName("task_" + i); + task.setTaskDefName(taskDefinition.getName()); + tasks.add(task); + task.setStatus(Task.Status.SCHEDULED); + task.setWorkflowTask(workflowTask); + } + + getExecutionDAO().createTasks(tasks); + assertFalse(getExecutionDAO().exceedsInProgressLimit(tasks.get(0))); + tasks.get(0).setStatus(Task.Status.IN_PROGRESS); + getExecutionDAO().updateTask(tasks.get(0)); + assertFalse(getExecutionDAO().exceedsInProgressLimit(tasks.get(0))); + + tasks.remove(0); + for (Task task : tasks) { + assertTrue(getExecutionDAO().exceedsInProgressLimit(task)); + } + } + + @Test + public void testTaskExceedsLimitCrossWorkflows() { + TaskDef taskDefinition = createTaskDef(0, 1); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("task1"); + workflowTask.setTaskDefinition(taskDefinition); + + List tasks = new ArrayList<>(); + for (int i = 0; i < 15; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(i + 1); + task.setTaskId("t_" + i); + task.setWorkflowInstanceId(TEST_WORKFLOW_ID + i); + task.setReferenceTaskName("task_" + i); + task.setTaskDefName(taskDefinition.getName()); + tasks.add(task); + task.setStatus(Task.Status.SCHEDULED); + task.setWorkflowTask(workflowTask); + dao.createWorkflow(createTestWorkflow(TEST_WORKFLOW_ID + i)); + dao.createTasks(Collections.singletonList(task)); + } + + Task firstTask = tasks.remove(0); + assertFalse(getExecutionDAO().exceedsInProgressLimit(firstTask)); + firstTask.setStatus(Task.Status.IN_PROGRESS); + getExecutionDAO().updateTask(firstTask); + assertFalse(getExecutionDAO().exceedsInProgressLimit(firstTask)); + + for (Task task : tasks) { + assertTrue(getExecutionDAO().exceedsInProgressLimit(task)); + dao.removeWorkflow(task.getWorkflowInstanceId()); + } + dao.removeWorkflow(firstTask.getWorkflowInstanceId()); + } + + @Test + public void testCreateTaskException() { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(TEST_TASK_ID_1); + task.setTaskDefName(TEST_TASK_DEF_NAME); + + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Workflow instance id cannot be null"); + getExecutionDAO().createTasks(Collections.singletonList(task)); + + task.setWorkflowInstanceId(TEST_WORKFLOW_ID); + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Task reference name cannot be null"); + getExecutionDAO().createTasks(Collections.singletonList(task)); + } + + @Test + public void testTaskCreateDups() { + List tasks = new ArrayList<>(); + String workflowId = TEST_WORKFLOW_ID; + + for (int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(i + 1); + task.setTaskId(workflowId + "_t" + i); + task.setReferenceTaskName("t" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + i); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + } + + // insert a retried task + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + 2 + "_" + 1); // it should has a different task id + task.setRetriedTaskId(workflowId + "_t" + 2); + task.setReferenceTaskName("t" + 2); + task.setRetryCount(1); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + 2); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + + // duplicate task + task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + 1); + task.setReferenceTaskName("t" + 1); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + 1); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + + List created = getExecutionDAO().createTasks(tasks); + assertEquals(tasks.size() - 1, created.size()); // 1 less + + Set srcIds = + tasks.stream() + .map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()) + .collect(Collectors.toSet()); + Set createdIds = + created.stream() + .map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()) + .collect(Collectors.toSet()); + + assertEquals(srcIds, createdIds); + + List pending = getExecutionDAO().getPendingTasksByWorkflow("task0", workflowId); + assertNotNull(pending); + assertEquals(1, pending.size()); + assertEquals(tasks.get(0), pending.get(0)); + + List found = getExecutionDAO().getTasks(tasks.get(0).getTaskDefName(), null, 1); + assertNotNull(found); + assertEquals(1, found.size()); + assertEquals(tasks.get(0), found.get(0)); + } + + @Test + public void testTaskOps() { + List tasks = new LinkedList<>(); + String workflowId = TEST_WORKFLOW_ID; + + for (int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + i); + task.setReferenceTaskName("testTaskOps" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("testTaskOps" + i); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + } + + dao.createWorkflow(createTestWorkflow("x" + workflowId)); + + for (int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("x" + workflowId + "_t" + i); + task.setReferenceTaskName("testTaskOps" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId("x" + workflowId); + task.setTaskDefName("testTaskOps" + i); + task.setStatus(Task.Status.IN_PROGRESS); + getExecutionDAO().createTasks(Collections.singletonList(task)); + } + + List created = getExecutionDAO().createTasks(tasks); + assertEquals(tasks.size(), created.size()); + + List pending = + getExecutionDAO().getPendingTasksForTaskType(tasks.get(0).getTaskDefName()); + assertNotNull(pending); + assertEquals(2, pending.size()); + // Pending list can come in any order. finding the one we are looking for and then comparing + Task matching = + pending.stream() + .filter(task -> task.getTaskId().equals(tasks.get(0).getTaskId())) + .findAny() + .get(); + assertEquals(matching, tasks.get(0)); + + for (int i = 0; i < 3; i++) { + Task found = getExecutionDAO().getTask(workflowId + "_t" + i); + assertNotNull(found); + found.getOutputData().put("updated", true); + found.setStatus(Task.Status.COMPLETED); + getExecutionDAO().updateTask(found); + } + + List taskIds = tasks.stream().map(Task::getTaskId).collect(Collectors.toList()); + List found = getExecutionDAO().getTasks(taskIds); + assertEquals(taskIds.size(), found.size()); + found.forEach( + task -> { + assertTrue(task.getOutputData().containsKey("updated")); + assertEquals(true, task.getOutputData().get("updated")); + boolean removed = getExecutionDAO().removeTask(task.getTaskId()); + assertTrue(removed); + }); + + found = getExecutionDAO().getTasks(taskIds); + assertTrue(found.isEmpty()); + dao.removeWorkflow("x" + workflowId); + } + + @Test + public void testPending() { + WorkflowDef def = new WorkflowDef(); + def.setName("pending_count_test"); + + Workflow workflow = createTestWorkflow(TEST_WORKFLOW_ID); + workflow.setWorkflowDefinition(def); + + List workflowIds = generateWorkflows(workflow, 10); + long count = getExecutionDAO().getPendingWorkflowCount(def.getName()); + assertEquals(10, count); + + for (String workflowId : workflowIds) { + getExecutionDAO().removeWorkflow(workflowId); + } + + count = getExecutionDAO().getPendingWorkflowCount(def.getName()); + assertEquals(0, count); + } + + @Test + public void complexExecutionTest() { + Workflow workflow = createComplexTestWorkflow(); + int numTasks = workflow.getTasks().size(); + + String workflowId = getExecutionDAO().updateWorkflow(workflow); + assertEquals(workflow.getWorkflowId(), workflowId); + + List created = getExecutionDAO().createTasks(workflow.getTasks()); + assertEquals(workflow.getTasks().size(), created.size()); + + Workflow workflowWithTasks = getExecutionDAO().getWorkflow(workflow.getWorkflowId(), true); + assertEquals(workflowId, workflowWithTasks.getWorkflowId()); + assertEquals(numTasks, workflowWithTasks.getTasks().size()); + + Workflow found = getExecutionDAO().getWorkflow(workflowId, false); + assertTrue(found.getTasks().isEmpty()); + + workflow.getTasks().clear(); + assertEquals(workflow, found); + + workflow.getInput().put("updated", true); + getExecutionDAO().updateWorkflow(workflow); + found = getExecutionDAO().getWorkflow(workflowId); + assertNotNull(found); + assertTrue(found.getInput().containsKey("updated")); + assertEquals(true, found.getInput().get("updated")); + + List running = + getExecutionDAO() + .getRunningWorkflowIds(workflow.getWorkflowName(), workflow.getWorkflowVersion()); + assertNotNull(running); + assertTrue(running.isEmpty()); + + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + getExecutionDAO().updateWorkflow(workflow); + + running = + getExecutionDAO() + .getRunningWorkflowIds(workflow.getWorkflowName(), workflow.getWorkflowVersion()); + assertNotNull(running); + assertEquals(1, running.size()); + assertEquals(workflow.getWorkflowId(), running.get(0)); + + List pending = + getExecutionDAO() + .getPendingWorkflowsByType(workflow.getWorkflowName(), workflow.getWorkflowVersion()); + assertNotNull(pending); + assertEquals(1, pending.size()); + assertEquals(3, pending.get(0).getTasks().size()); + pending.get(0).getTasks().clear(); + assertEquals(workflow, pending.get(0)); + + List bytime = + getExecutionDAO() + .getWorkflowsByType( + workflow.getWorkflowName(), + workflow.getCreateTime() - 10, + workflow.getCreateTime() + 10); + assertNotNull(bytime); + assertEquals(1, bytime.size()); + + workflow.setStatus(Workflow.WorkflowStatus.COMPLETED); + getExecutionDAO().updateWorkflow(workflow); + running = + getExecutionDAO() + .getRunningWorkflowIds(workflow.getWorkflowName(), workflow.getWorkflowVersion()); + assertNotNull(running); + assertTrue(running.isEmpty()); + + bytime = + getExecutionDAO() + .getWorkflowsByType( + workflow.getWorkflowName(), + workflow.getStartTime() - 10, + workflow.getStartTime() + 10); + assertNotNull(bytime); + assertEquals(0, bytime.size()); + bytime = + getExecutionDAO() + .getWorkflowsByType( + workflow.getWorkflowName(), + workflow.getStartTime() + 100, + workflow.getStartTime() + 200); + assertNotNull(bytime); + assertTrue(bytime.isEmpty()); + } + + protected Workflow createComplexTestWorkflow() { + WorkflowDef def = new WorkflowDef(); + def.setName(TEST_WORKFLOW_ID); + def.setVersion(3); + def.setSchemaVersion(2); + + Workflow workflow = new Workflow(); + workflow.setWorkflowDefinition(def); + workflow.setCorrelationId("correlationX"); + workflow.setCreatedBy("junit_tester"); + workflow.setEndTime(200L); + + Map input = new HashMap<>(); + input.put("param1", "param1 value"); + input.put("param2", 100); + workflow.setInput(input); + + Map output = new HashMap<>(); + output.put("ouput1", "output 1 value"); + output.put("op2", 300); + workflow.setOutput(output); + + workflow.setOwnerApp("workflow"); + workflow.setParentWorkflowId("parentWorkflowId"); + workflow.setParentWorkflowTaskId("parentWFTaskId"); + workflow.setReasonForIncompletion("missing recipe"); + workflow.setReRunFromWorkflowId("re-run from id1"); + workflow.setStartTime(90L); + workflow.setStatus(Workflow.WorkflowStatus.FAILED); + workflow.setWorkflowId(TEST_WORKFLOW_ID); + + List tasks = new LinkedList<>(); + + Task task1 = new Task(); + task1.setScheduledTime(1L); + task1.setSeq(1); + task1.setTaskId(UUID.randomUUID().toString()); + task1.setReferenceTaskName("t1"); + task1.setWorkflowInstanceId(workflow.getWorkflowId()); + task1.setTaskDefName("task1"); + + Task task2 = new Task(); + task2.setScheduledTime(2L); + task2.setSeq(2); + task2.setTaskId(UUID.randomUUID().toString()); + task2.setReferenceTaskName("t2"); + task2.setWorkflowInstanceId(workflow.getWorkflowId()); + task2.setTaskDefName("task2"); + + Task task3 = new Task(); + task3.setScheduledTime(2L); + task3.setSeq(3); + task3.setTaskId(UUID.randomUUID().toString()); + task3.setReferenceTaskName("t3"); + task3.setWorkflowInstanceId(workflow.getWorkflowId()); + task3.setTaskDefName("task3"); + + tasks.add(task1); + tasks.add(task2); + tasks.add(task3); + + workflow.setTasks(tasks); + + workflow.setUpdatedBy("junit_tester"); + workflow.setUpdateTime(800L); + + return workflow; + } + + protected List generateWorkflows(Workflow base, int count) { + List workflowIds = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String workflowId = UUID.randomUUID().toString(); + base.setWorkflowId(workflowId); + base.setCorrelationId("corr001"); + base.setStatus(Workflow.WorkflowStatus.RUNNING); + getExecutionDAO().createWorkflow(base); + workflowIds.add(workflowId); + } + return workflowIds; + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAOTest.java new file mode 100644 index 0000000..db702a2 --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBIndexDAOTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableMap; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.IndexDAO; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class CockroachDBIndexDAOTest extends CockroachDBBaseTest { + + private IndexDAO dao; + private ExecutionDAO executionDAO; + + Workflow workflow; + + @Before + public void setUp() { + dao = new CockroachDBIndexDAO(dataSource, objectMapper, config); + executionDAO = + new CockroachDBExecutionDAO( + dataSource, + new CockroachDBIndexDAO(dataSource, objectMapper, config), + objectMapper, + config); + workflow = createTestWorkflow(TEST_WORKFLOW_ID); + executionDAO.createWorkflow(workflow); + } + + @After + public void tearDown() { + executionDAO.removeWorkflow(TEST_WORKFLOW_ID); + } + + @Test + public void searchWorkflowsTest() { + SearchResult result = dao.searchWorkflows(TEST_WORKFLOW_NAME, "RUNNING", 0, 10, null); + assertEquals(1, result.getTotalHits()); + assertEquals(TEST_WORKFLOW_ID, result.getResults().get(0)); + result = dao.searchWorkflows(TEST_WORKFLOW_ID, "RUNNING", 1, 10, null); + assertEquals(0, result.getTotalHits()); + } + + @Test + public void searchTasksTest() { + SearchResult result = dao.searchTasks(TEST_TASK_DEF_NAME, "IN_PROGRESS", 0, 10, null); + assertEquals(0, result.getTotalHits()); + Task task = createRunningTestTask(TEST_TASK_ID_1); + executionDAO.updateTask(task); + result = dao.searchTasks(TEST_TASK_DEF_NAME, "IN_PROGRESS", 0, 10, null); + assertEquals(1, result.getTotalHits()); + assertEquals(TEST_TASK_ID_1, result.getResults().get(0)); + result = dao.searchTasks(TEST_TASK_DEF_NAME, "IN_PROGRESS", 1, 10, null); + assertEquals(0, result.getTotalHits()); + } + + @Test + public void addTaskExecutionLogsTest() { + List logs = new ArrayList<>(); + logs.add(createLog(TEST_TASK_ID_1, "log1")); + logs.add(createLog(TEST_TASK_ID_1, "log2")); + logs.add(createLog(TEST_TASK_ID_1, "log3")); + + dao.addTaskExecutionLogs(logs); + + List indexedLogs = + tryFindResults(() -> dao.getTaskExecutionLogs(TEST_TASK_ID_1), 3); + + assertEquals(3, indexedLogs.size()); + + assertTrue("Not all logs was indexed", indexedLogs.containsAll(logs)); + } + + @Test + public void asyncAddTaskExecutionLogsTest() throws Exception { + List logs = new ArrayList<>(); + logs.add(createLog(TEST_TASK_ID_2, "log1")); + logs.add(createLog(TEST_TASK_ID_2, "log2")); + logs.add(createLog(TEST_TASK_ID_2, "log3")); + + dao.asyncAddTaskExecutionLogs(logs).get(); + + List indexedLogs = + tryFindResults(() -> dao.getTaskExecutionLogs(TEST_TASK_ID_2), 3); + + assertEquals(3, indexedLogs.size()); + + assertTrue("Not all logs was indexed", indexedLogs.containsAll(logs)); + } + + @Test + public void getEventExecutionsTest() { + String event = "event"; + EventExecution execution1 = createEventExecution(event); + EventExecution execution2 = createEventExecution(event); + + executionDAO.addEventExecution(execution1); + executionDAO.addEventExecution(execution2); + + List indexedExecutions = tryFindResults(() -> dao.getEventExecutions(event), 2); + + assertEquals(2, indexedExecutions.size()); + + assertTrue( + "Not all event executions was indexed", + indexedExecutions.containsAll(Arrays.asList(execution1, execution2))); + } + + private List tryFindResults(Supplier> searchFunction, int resultsCount) { + List result = Collections.emptyList(); + for (int i = 0; i < 20; i++) { + result = searchFunction.get(); + if (result.size() == resultsCount) { + return result; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + return result; + } + + private TaskExecLog createLog(String taskId, String log) { + TaskExecLog taskExecLog = new TaskExecLog(log); + taskExecLog.setTaskId(taskId); + return taskExecLog; + } + + private EventExecution createEventExecution(String event) { + EventExecution execution = new EventExecution(uuid(), uuid()); + execution.setName("name"); + execution.setEvent(event); + execution.setCreated(System.currentTimeMillis()); + execution.setStatus(EventExecution.Status.COMPLETED); + execution.setAction(EventHandler.Action.Type.start_workflow); + execution.setOutput(ImmutableMap.of("a", 1, "b", 2, "c", 3)); + return execution; + } + + private String uuid() { + return UUID.randomUUID().toString(); + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAOTest.java new file mode 100644 index 0000000..7190d50 --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBMetadataDAOTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.MetadataDAO; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Test; + +public class CockroachDBMetadataDAOTest extends CockroachDBBaseTest { + private MetadataDAO dao; + + @Before + public void setUp() { + dao = new CockroachDBMetadataDAO(dataSource, objectMapper, config); + } + + @Test(expected = ApplicationException.class) + public void testDuplicate() { + WorkflowDef wfd = new WorkflowDef(); + wfd.setName("foo"); + wfd.setVersion(1); + + dao.createWorkflowDef(wfd); + dao.createWorkflowDef(wfd); + } + + @Test + public void testWorkflowDefOperations() throws Exception { + WorkflowDef wfd = new WorkflowDef(); + wfd.setName("test"); + wfd.setVersion(1); + wfd.setDescription("description"); + wfd.setCreatedBy("unit_test"); + wfd.setCreateTime(1L); + wfd.setOwnerApp("ownerApp"); + wfd.setUpdatedBy("unit_test2"); + wfd.setUpdateTime(2L); + + dao.createWorkflowDef(wfd); + + List all = dao.getAllWorkflowDefs(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals(1, all.get(0).getVersion()); + + WorkflowDef found = dao.getWorkflowDef("test", 1).get(); + assertEquals(wfd, found); + + wfd.setVersion(2); + dao.createWorkflowDef(wfd); + + all = dao.getAllWorkflowDefs(); + assertNotNull(all); + assertEquals(2, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals("test", all.get(1).getName()); + assertEquals(2, all.get(0).getVersion()); + assertEquals(1, all.get(1).getVersion()); + + found = dao.getLatestWorkflowDef(wfd.getName()).get(); + assertEquals(wfd.getName(), found.getName()); + assertEquals(wfd.getVersion(), found.getVersion()); + assertEquals(2, found.getVersion()); + + wfd.setDescription("updated"); + dao.updateWorkflowDef(wfd); + found = dao.getWorkflowDef(wfd.getName(), wfd.getVersion()).get(); + assertEquals(wfd.getDescription(), found.getDescription()); + + dao.removeWorkflowDef("test", 1); + Optional deleted = dao.getWorkflowDef("test", 1); + assertFalse(deleted.isPresent()); + } + + @Test + public void testTaskDefOperations() throws Exception { + TaskDef def = new TaskDef("taskA"); + def.setDescription("description"); + def.setCreatedBy("unit_test"); + def.setCreateTime(1L); + def.setInputKeys(Arrays.asList("a", "b", "c")); + def.setOutputKeys(Arrays.asList("01", "o2")); + def.setOwnerApp("ownerApp"); + def.setRetryCount(3); + def.setRetryDelaySeconds(100); + def.setRetryLogic(TaskDef.RetryLogic.FIXED); + def.setTimeoutPolicy(TaskDef.TimeoutPolicy.ALERT_ONLY); + def.setUpdatedBy("unit_test2"); + def.setUpdateTime(2L); + + dao.createTaskDef(def); + + TaskDef found = dao.getTaskDef(def.getName()); + assertEquals(def, found); + + def.setDescription("updated description"); + dao.updateTaskDef(def); + found = dao.getTaskDef(def.getName()); + assertEquals(def, found); + assertEquals("updated description", found.getDescription()); + + for (int i = 0; i < 9; i++) { + TaskDef tdf = new TaskDef("taskA" + i); + dao.createTaskDef(tdf); + } + + List all = dao.getAllTaskDefs(); + assertNotNull(all); + assertEquals(10, all.size()); + Set allnames = all.stream().map(TaskDef::getName).collect(Collectors.toSet()); + assertEquals(10, allnames.size()); + List sorted = allnames.stream().sorted().collect(Collectors.toList()); + assertEquals(def.getName(), sorted.get(0)); + + for (int i = 0; i < 9; i++) { + assertEquals(def.getName() + i, sorted.get(i + 1)); + } + + for (int i = 0; i < 9; i++) { + dao.removeTaskDef(def.getName() + i); + } + all = dao.getAllTaskDefs(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals(def.getName(), all.get(0).getName()); + } + + @Test(expected = ApplicationException.class) + public void testRemoveTaskDef() throws Exception { + dao.removeTaskDef("test" + UUID.randomUUID().toString()); + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAOTest.java new file mode 100644 index 0000000..8141b8e --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBPollDataDAOTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.netflix.conductor.common.metadata.tasks.PollData; +import com.netflix.conductor.dao.PollDataDAO; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class CockroachDBPollDataDAOTest extends CockroachDBBaseTest { + + private PollDataDAO dao; + + @Before + public void setUp() { + dao = new CockroachDBPollDataDAO(dataSource, objectMapper, config); + } + + protected PollDataDAO getPollDataDAO() { + return dao; + } + + @Test + public void testPollData() { + getPollDataDAO().updateLastPollData("taskDef", null, "workerId1"); + PollData pollData = getPollDataDAO().getPollData("taskDef", null); + assertNotNull(pollData); + assertTrue(pollData.getLastPollTime() > 0); + assertEquals(pollData.getQueueName(), "taskDef"); + assertNull(pollData.getDomain()); + assertEquals(pollData.getWorkerId(), "workerId1"); + + getPollDataDAO().updateLastPollData("taskDef", "domain1", "workerId1"); + pollData = getPollDataDAO().getPollData("taskDef", "domain1"); + assertNotNull(pollData); + assertTrue(pollData.getLastPollTime() > 0); + assertEquals(pollData.getQueueName(), "taskDef"); + assertEquals(pollData.getDomain(), "domain1"); + assertEquals(pollData.getWorkerId(), "workerId1"); + + List pData = getPollDataDAO().getPollData("taskDef"); + assertEquals(pData.size(), 2); + + pollData = getPollDataDAO().getPollData("taskDef", "domain2"); + assertNull(pollData); + } +} diff --git a/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAOTest.java b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAOTest.java new file mode 100644 index 0000000..69b5bee --- /dev/null +++ b/cockroachdb-persistence/src/test/java/com/netflix/conductor/cockroachdb/dao/CockroachDBRateLimitingDAOTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.cockroachdb.dao; + +import static org.junit.Assert.*; + +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.RateLimitingDAO; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class CockroachDBRateLimitingDAOTest extends CockroachDBBaseTest { + private RateLimitingDAO dao; + private ExecutionDAO executionDAO; + + @Before + public void setUp() { + dao = new CockroachDBRateLimitingDAO(dataSource, objectMapper, config); + executionDAO = + new CockroachDBExecutionDAO( + dataSource, + new CockroachDBIndexDAO(dataSource, objectMapper, config), + objectMapper, + config); + executionDAO.createWorkflow(createTestWorkflow(TEST_WORKFLOW_ID)); + } + + @After + public void tearDown() { + executionDAO.removeWorkflow(TEST_WORKFLOW_ID); + } + + @Test + public void testExceedsRateLimitWhenNoRateLimitSet() { + TaskDef taskDef = createTaskDef(0, 0); + Task task = createRunningTestTask(TEST_TASK_ID_1); + assertFalse(dao.exceedsRateLimitPerFrequency(task, taskDef)); + } + + @Test + public void testExceedsRateLimitWithinLimit() { + TaskDef taskDef = createTaskDef(10, 0); + Task task = createRunningTestTask(TEST_TASK_ID_1); + executionDAO.updateTask(task); + task = createRunningTestTask(TEST_TASK_ID_2); + executionDAO.updateTask(task); + assertFalse(dao.exceedsRateLimitPerFrequency(task, taskDef)); + } + + @Test + public void testExceedsRateLimitOutOfLimit() { + TaskDef taskDef = createTaskDef(1, 0); + Task task = createRunningTestTask(TEST_TASK_ID_1); + executionDAO.updateTask(task); + assertFalse(dao.exceedsRateLimitPerFrequency(task, taskDef)); + task = createRunningTestTask(TEST_TASK_ID_2); + executionDAO.updateTask(task); + assertTrue(dao.exceedsRateLimitPerFrequency(task, taskDef)); + } +} diff --git a/cockroachdb-persistence/src/test/resources/log4j.properties b/cockroachdb-persistence/src/test/resources/log4j.properties new file mode 100644 index 0000000..39e84a0 --- /dev/null +++ b/cockroachdb-persistence/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# +# Copyright 2020 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to WARN and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n