Skip to content

Commit

Permalink
Merge pull request #1939 from cloudsufi/revert/RowDenormalizerAggregator
Browse files Browse the repository at this point in the history
Revert "[PLUGIN-1861] Error management for excel plugin | RowDenormaliser"
  • Loading branch information
itsankit-google authored Feb 12, 2025
2 parents 63e92b1 + f3f39c8 commit 673c94b
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -92,18 +95,24 @@ public void initialize(BatchRuntimeContext context) throws Exception {
public void groupBy(StructuredRecord record, Emitter<String> emitter) throws Exception {
if (record.get(keyField) == null) {
if (record.getSchema().getField(keyField) == null) {
throw new IllegalArgumentException(
String.format("Keyfield '%s' does not exist in input schema %s", keyField, record.getSchema()));
String error = String.format("Keyfield '%s' does not exist in input schema %s",
keyField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
return;
}
if (record.get(nameField) == null && record.getSchema().getField(nameField) == null) {
throw new IllegalArgumentException(
String.format("Namefield '%s' does not exist in input schema %s", nameField, record.getSchema()));
String error = String.format("Namefield '%s' does not exist in input schema %s",
nameField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
if (record.get(valueField) == null && record.getSchema().getField(valueField) == null) {
throw new IllegalArgumentException(
String.format("Valuefield '%s' does not exist in input schema %s", valueField, record.getSchema()));
String error = String.format("Valuefield '%s' does not exist in input schema %s",
valueField, record.getSchema());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}
emitter.emit((String) record.get(keyField));
}
Expand Down

0 comments on commit 673c94b

Please sign in to comment.