diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index e1471d52..cb4374d6 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -16,6 +16,26 @@ on: - 'docs/**' jobs: + artifacts: + runs-on: ubuntu-latest + name: Artifacts + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'adopt' + architecture: x64 + - name: Build + run: | + ./gradlew clean createConfluentArchive + - name: Save Jars + uses: actions/upload-artifact@v4 + with: + name: build_artifacts + path: build/libs/*.jar + retention-days: 5 build: runs-on: ubuntu-latest strategy: @@ -53,4 +73,4 @@ jobs: CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} CLIENT_VERSION: ${{ matrix.client }} with: - arguments: test + arguments: test \ No newline at end of file diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java index 5fbc9d7f..4e59c04a 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java @@ -56,11 +56,23 @@ public void put(Collection records) { Utils.handleException(e, errorTolerance, records); if (errorTolerance && errorReporter != null) { LOGGER.warn("Sending [{}] records to DLQ for exception: {}", records.size(), e.getLocalizedMessage()); - records.forEach(r -> Utils.sendTODlq(errorReporter, r, e)); + final Exception actualException = getActualException(e); + records.forEach(r -> Utils.sendTODlq(errorReporter, r, actualException)); } } } + private static Exception getActualException(Exception e) { + Throwable cause = e.getCause(); + try { + if (cause instanceof Exception) { + return (Exception) cause; + } + } catch (ClassCastException cce) { + // ignore + } + return e; + } // TODO: can be removed ss @Override diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java index 113c9c56..cf0a016b 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java @@ -57,6 +57,7 @@ private void doInsert(List records, RangeContainer rangeContainer) { LOGGER.debug("doInsert - Records: [{}] - {}", records.size(), queryId); dbWriter.doInsert(records, queryId, errorReporter); } catch (Exception e) { + LOGGER.error("doInsert - Error inserting records", e); throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate } }