Skip to content

Commit

Permalink
Merge pull request #1935 from cloudsufi/fem/excel
Browse files Browse the repository at this point in the history
[PLUGIN-1861] Error management for excel plugin
  • Loading branch information
psainics authored Feb 12, 2025
2 parents 8dae8dd + 4967ee2 commit 63e92b1
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -95,24 +92,18 @@ public void initialize(BatchRuntimeContext context) throws Exception {
public void groupBy(StructuredRecord record, Emitter<String> emitter) throws Exception {
if (record.get(keyField) == null) {
if (record.getSchema().getField(keyField) == null) {
String error = String.format("Keyfield '%s' does not exist in input schema %s",
keyField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
throw new IllegalArgumentException(
String.format("Keyfield '%s' does not exist in input schema %s", keyField, record.getSchema()));
}
return;
}
if (record.get(nameField) == null && record.getSchema().getField(nameField) == null) {
String error = String.format("Namefield '%s' does not exist in input schema %s",
nameField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
throw new IllegalArgumentException(
String.format("Namefield '%s' does not exist in input schema %s", nameField, record.getSchema()));
}
if (record.get(valueField) == null && record.getSchema().getField(valueField) == null) {
String error = String.format("Valuefield '%s' does not exist in input schema %s",
valueField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
throw new IllegalArgumentException(
String.format("Valuefield '%s' does not exist in input schema %s", valueField, record.getSchema()));
}
emitter.emit((String) record.get(keyField));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.batch.source;

import com.github.pjfanning.xlsx.exceptions.MissingSheetException;
import com.github.pjfanning.xlsx.exceptions.ReadException;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import org.apache.poi.EmptyFileException;

import java.util.List;
import javax.annotation.Nullable;

/**
* ExcelErrorDetailsProvider provider
*/
public class ExcelErrorDetailsProvider implements ErrorDetailsProvider {

private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
private static final String SUBCATEGORY_CONFIGURATION = "Configuration";
private static final String SUBCATEGORY_DATA_MISSING = "Data Integrity";
private static final String SUBCATEGORY_FILE_READ_ERROR = "File Read";

@Nullable
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof MissingSheetException) {
return getProgramFailureException((MissingSheetException) t, errorContext,
ErrorType.USER, SUBCATEGORY_DATA_MISSING);
}
if (t instanceof ReadException) {
return getProgramFailureException((ReadException) t, errorContext,
ErrorType.USER, SUBCATEGORY_FILE_READ_ERROR);
}
if (t instanceof EmptyFileException) {
return getProgramFailureException((EmptyFileException) t, errorContext,
ErrorType.USER, SUBCATEGORY_DATA_MISSING);
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext,
ErrorType.USER, SUBCATEGORY_CONFIGURATION);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error information from {@link Exception}.
*
* @param exception The Exception to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(Exception exception, ErrorContext errorContext,
ErrorType errorType, String subCategory) {
String errorMessage = exception.getMessage();
return ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType,
false, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -193,7 +196,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
}
} catch (Exception e) {
throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e);
String error = String.format("Exception while reading excel sheet: %s", e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, e);
}

// As we cannot get the number of rows in a sheet while streaming.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.dataset.lib.KeyValueTable;
import io.cdap.cdap.api.dataset.table.Table;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.Properties;
import io.cdap.plugin.common.ReferencePluginConfig;
Expand Down Expand Up @@ -192,10 +196,11 @@ public void transform(KeyValue<LongWritable, Object> input, Emitter<StructuredRe

int currentRowNum = Integer.parseInt(excelRecord[0]);
if (currentRowNum - prevRowNum > 1 && excelInputreaderConfig.terminateIfEmptyRow.equalsIgnoreCase("true")) {
throw new ExecutionException("Encountered empty row while reading Excel file :" + fileName +
" . Terminating processing", new Throwable());
String error = String.format("Encountered empty row while reading Excel file :%s." +
" Terminating processing", fileName);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
prevRowNum = currentRowNum;

Map<String, String> excelColumnValueMap = new HashMap<>();

Expand All @@ -212,6 +217,9 @@ public void transform(KeyValue<LongWritable, Object> input, Emitter<StructuredRe
}
}
}
if (!excelColumnValueMap.isEmpty()) {
prevRowNum = currentRowNum;
}

try {
for (Schema.Field field : outputSchema.getFields()) {
Expand Down Expand Up @@ -332,6 +340,9 @@ public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
// Sets the input path(s).
ExcelInputFormat.addInputPaths(job, excelInputreaderConfig.filePath);

batchSourceContext.setErrorDetailsProvider(new ErrorDetailsProviderSpec
(ExcelErrorDetailsProvider.class.getName()));

// Sets the filter based on extended class implementation.
ExcelInputFormat.setInputPathFilter(job, ExcelReaderRegexFilter.class);
SourceInputFormatProvider inputFormatProvider = new SourceInputFormatProvider(ExcelInputFormat.class,
Expand Down Expand Up @@ -442,8 +453,10 @@ private void getOutputSchema() {
}
}
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating output schema for Excel input reader. " +
"Invalid output " + "schema: " + e.getMessage(), e);
String error = String.format("Exception while creating output schema for Excel input reader. " +
"Invalid output " + "schema: %s", e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, e);
}
outputSchema = Schema.recordOf("outputSchema", outputFields);
}
Expand Down

0 comments on commit 63e92b1

Please sign in to comment.