Skip to content

Commit

Permalink
repo-sync-2024-10-11T15:57:26+0800 (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperMarz authored Oct 11, 2024
1 parent ee91bc6 commit 3b8bcc3
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 61 deletions.
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,3 @@ workflows:
.bazelrc sdk-build-and-run true
WORKSPACE sdk-build-and-run true
- lint

Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ public enum DataproxyErrorCode {

// odps 异常
ODPS_CREATE_TABLE_FAILED(ErrorLevels.ERROR, ErrorTypes.BIZ, "600", "Create ODPS table failed"),
ODPS_ERROR(ErrorLevels.ERROR, ErrorTypes.BIZ, "601", "ODPS error"),
ODPS_CREATE_PARTITION_FAILED(ErrorLevels.ERROR, ErrorTypes.BIZ, "601", "Create ODPS table failed"),
ODPS_ERROR(ErrorLevels.ERROR, ErrorTypes.BIZ, "602", "ODPS error"),
ODPS_TABLE_ALREADY_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "603", "odps table already exists"),
ODPS_TABLE_NOT_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "604", "odps table not exists"),
ODPS_PARTITION_ALREADY_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "605", "odps partition already exists"),
ODPS_PARTITION_NOT_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "606", "odps partition not exists"),
ODPS_TABLE_NOT_EMPTY(ErrorLevels.ERROR, ErrorTypes.BIZ, "607", "odps table not empty"),
ODPS_TABLE_NOT_SUPPORT_PARTITION(ErrorLevels.ERROR, ErrorTypes.BIZ, "608", "odps table not support partition"),


//============================= 第三方错误【900-999】==================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.secretflow.dataproxy.manager.connector.odps;

import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.OdpsException;
import org.apache.arrow.memory.BufferAllocator;
import org.secretflow.dataproxy.common.model.InferSchemaResult;
import org.secretflow.dataproxy.common.model.command.DatasetReadCommand;
Expand Down Expand Up @@ -89,7 +89,7 @@ public DataWriter buildWriter(DatasetWriteCommand writeCommand) {
if (Objects.equals(DatasetFormatTypeEnum.TABLE, writeCommand.getFormatConfig().getType())) {
try {
return new OdpsDataWriter(config, locationConfig, writeCommand.getSchema());
} catch (TunnelException | IOException e) {
} catch (IOException | OdpsException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -67,12 +70,12 @@ public class OdpsDataWriter implements DataWriter {

private final boolean overwrite = true;

private boolean isTemporarilyCreatedTable = false;
private boolean isPartitioned = false;

private TableTunnel.UploadSession uploadSession = null;
private RecordWriter recordWriter = null;

public OdpsDataWriter(OdpsConnConfig connConfig, OdpsTableInfo tableInfo, Schema schema) throws TunnelException, IOException {
public OdpsDataWriter(OdpsConnConfig connConfig, OdpsTableInfo tableInfo, Schema schema) throws OdpsException, IOException {
this.connConfig = connConfig;
this.tableInfo = tableInfo;
this.schema = schema;
Expand All @@ -96,7 +99,8 @@ record = uploadSession.newRecord();

for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
log.debug("column: {}, type: {}", columnIndex, root.getFieldVectors().get(columnIndex).getField().getType());
columnName = root.getVector(columnIndex).getField().getName();
// odps column name is lower case
columnName = root.getVector(columnIndex).getField().getName().toLowerCase();

if (tableSchema.containsColumn(columnName)) {
this.setRecordValue(record, tableSchema.getColumnIndex(columnName), this.getValue(root.getFieldVectors().get(columnIndex), rowIndex));
Expand Down Expand Up @@ -144,21 +148,25 @@ private Odps initOdpsClient(OdpsConnConfig odpsConnConfig) {
return OdpsUtil.buildOdps(odpsConnConfig);
}

private void initOdps() throws TunnelException, IOException {
private void initOdps() throws OdpsException, IOException {
// init odps client
Odps odps = initOdpsClient(this.connConfig);
// Pre-processing
preProcessing(odps, connConfig.getProjectName(), tableInfo.tableName());
preProcessing(odps, connConfig.getProjectName(), tableInfo.tableName(), this.convertToPartitionSpec(tableInfo.partitionSpec()));
// init upload session
TableTunnel tunnel = new TableTunnel(odps);
if (tableInfo.partitionSpec() != null && !tableInfo.partitionSpec().isEmpty() && !isTemporarilyCreatedTable) {

if (isPartitioned) {
if (tableInfo.partitionSpec() == null || tableInfo.partitionSpec().isEmpty()) {
throw DataproxyException.of(DataproxyErrorCode.INVALID_PARTITION_SPEC, "partitionSpec is empty");
}
PartitionSpec partitionSpec = new PartitionSpec(tableInfo.partitionSpec());
uploadSession = tunnel.createUploadSession(connConfig.getProjectName(), tableInfo.tableName(), partitionSpec, overwrite);
} else {
uploadSession = tunnel.createUploadSession(connConfig.getProjectName(), tableInfo.tableName(), overwrite);
}

recordWriter = uploadSession.openRecordWriter(0);
recordWriter = uploadSession.openRecordWriter(0, true);
}

/**
Expand All @@ -173,6 +181,7 @@ private void initOdps() throws TunnelException, IOException {
private void setRecordValue(Record record, int columnIndex, Object value) {
if (value == null) {
record.set(columnIndex, null);
log.warn("table name: {} record set null value. index: {}", tableInfo.tableName(), columnIndex);
return;
}

Expand All @@ -184,8 +193,11 @@ private void setRecordValue(Record record, int columnIndex, Object value) {
case STRING -> record.setString(columnIndex, String.valueOf(value));
case FLOAT -> record.set(columnIndex, Float.parseFloat(String.valueOf(value)));
case DOUBLE -> record.set(columnIndex, Double.parseDouble(String.valueOf(value)));
case TINYINT -> record.set(columnIndex, Byte.parseByte(String.valueOf(value)));
case SMALLINT -> record.set(columnIndex, Short.parseShort(String.valueOf(value)));
case BIGINT -> record.set(columnIndex, Long.parseLong(String.valueOf(value)));
case INT -> record.set(columnIndex, Integer.parseInt(String.valueOf(value)));
case BOOLEAN -> record.setBoolean(columnIndex, (Boolean) value);
default -> record.set(columnIndex, value);
}
}
Expand All @@ -205,23 +217,32 @@ private Object getValue(FieldVector fieldVector, int index) {

switch (arrowTypeID) {
case Int -> {
if (fieldVector instanceof IntVector || fieldVector instanceof BigIntVector || fieldVector instanceof SmallIntVector) {
if (fieldVector instanceof IntVector || fieldVector instanceof BigIntVector || fieldVector instanceof SmallIntVector || fieldVector instanceof TinyIntVector) {
return fieldVector.getObject(index);
}
log.warn("Type INT is not IntVector or BigIntVector or SmallIntVector or TinyIntVector, value is: {}", fieldVector.getObject(index).toString());
}
case FloatingPoint -> {
if (fieldVector instanceof Float4Vector | fieldVector instanceof Float8Vector) {
return fieldVector.getObject(index);
}
log.warn("Type FloatingPoint is not Float4Vector or Float8Vector, value is: {}", fieldVector.getObject(index).toString());
}
case Utf8 -> {
if (fieldVector instanceof VarCharVector vector) {
return new String(vector.get(index), StandardCharsets.UTF_8);
}
log.warn("Type Utf8 is not VarCharVector, value is: {}", fieldVector.getObject(index).toString());
}
case Null -> {
return null;
}
case Bool -> {
if (fieldVector instanceof BitVector vector) {
return vector.get(index) == 1;
}
log.warn("Type BOOL is not BitVector, value is: {}", fieldVector.getObject(index).toString());
}
default -> {
log.warn("Not implemented type: {}, will use default function", arrowTypeID);
return fieldVector.getObject(index);
Expand All @@ -239,16 +260,35 @@ private Object getValue(FieldVector fieldVector, int index) {
* @param projectName project name
* @param tableName table name
*/
private void preProcessing(Odps odps, String projectName, String tableName) {
private void preProcessing(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) throws OdpsException {

if (!isExistsTable(odps, projectName, tableName)) {
boolean odpsTable = createOdpsTable(odps, projectName, tableName, schema);
boolean odpsTable = createOdpsTable(odps, projectName, tableName, schema, partitionSpec);
if (!odpsTable) {
throw DataproxyException.of(DataproxyErrorCode.ODPS_CREATE_TABLE_FAILED);
}
isTemporarilyCreatedTable = true;
log.info("odps table is not exists, create table successful, project: {}, table name: {}", projectName, tableName);
} else {
log.info("odps table is exists, project: {}, table name: {}", projectName, tableName);
}
isPartitioned = odps.tables().get(projectName, tableName).isPartitioned();

if (isPartitioned) {
if (partitionSpec == null || partitionSpec.isEmpty()) {
throw DataproxyException.of(DataproxyErrorCode.INVALID_PARTITION_SPEC, "partitionSpec is empty");
}

if (!isExistsPartition(odps, projectName, tableName, partitionSpec)) {
boolean odpsPartition = createOdpsPartition(odps, projectName, tableName, partitionSpec);
if (!odpsPartition) {
throw DataproxyException.of(DataproxyErrorCode.ODPS_CREATE_PARTITION_FAILED);
}
log.info("odps partition is not exists, create partition successful, project: {}, table name: {}, PartitionSpec: {}", projectName, tableName, partitionSpec);
} else {
log.info("odps partition is exists, project: {}, table name: {}, PartitionSpec: {}", projectName, tableName, partitionSpec);
}

}
log.info("odps table is exists or create table successful, project: {}, table name: {}", projectName, tableName);
}

/**
Expand All @@ -268,21 +308,72 @@ private boolean isExistsTable(Odps odps, String projectName, String tableName) {
return false;
}

private boolean createOdpsTable(Odps odps, String projectName, String tableName, Schema schema) {
private boolean isExistsPartition(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) throws OdpsException {
Table table = odps.tables().get(projectName, tableName);

if (table == null) {
log.warn("table is null, projectName:{}, tableName:{}", projectName, tableName);
throw DataproxyException.of(DataproxyErrorCode.ODPS_TABLE_NOT_EXISTS);
}

return table.hasPartition(partitionSpec);
}

/**
* create odps table
*
* @param odps odps client
* @param projectName project name
* @param tableName table name
* @param schema schema
* @param partitionSpec partition spec
* @return true or false
*/
private boolean createOdpsTable(Odps odps, String projectName, String tableName, Schema schema, PartitionSpec partitionSpec) {
try {
odps.tables().create(projectName, tableName, convertToTableSchema(schema), true);
TableSchema tableSchema = convertToTableSchema(schema);
if (partitionSpec != null) {
// Infer partitioning field type as string.
partitionSpec.keys().forEach(key -> tableSchema.addPartitionColumn(Column.newBuilder(key, TypeInfoFactory.STRING).build()));
}
odps.tables().create(projectName, tableName, tableSchema, "", true, null, OdpsUtil.getSqlFlag(), null);
return true;
} catch (Exception e) {
log.error("create odps table error, projectName:{}, tableName:{}", projectName, tableName, e);
}
return false;
}

private boolean createOdpsPartition(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) {
try {
Table table = odps.tables().get(projectName, tableName);
table.createPartition(partitionSpec, true);
return true;
} catch (Exception e) {
log.error("create odps partition error, projectName:{}, tableName:{}", projectName, tableName, e);
}
return false;
}

private TableSchema convertToTableSchema(Schema schema) {
List<Column> columns = schema.getFields().stream().map(this::convertToColumn).toList();
return TableSchema.builder().withColumns(columns).build();
}

/**
* convert partition spec
*
* @param partitionSpec partition spec
* @return partition spec
* @throws IllegalArgumentException if partitionSpec is invalid
*/
private PartitionSpec convertToPartitionSpec(String partitionSpec) {
if (partitionSpec == null || partitionSpec.isEmpty()) {
return null;
}
return new PartitionSpec(partitionSpec);
}

private Column convertToColumn(Field field) {
return Column.newBuilder(field.getName(), convertToType(field.getType())).build();
}
Expand All @@ -304,14 +395,23 @@ private TypeInfo convertToType(ArrowType type) {
};
}
case Int -> {
return TypeInfoFactory.INT;
return switch (((ArrowType.Int) type).getBitWidth()) {
case 8 -> TypeInfoFactory.TINYINT;
case 16 -> TypeInfoFactory.SMALLINT;
case 32 -> TypeInfoFactory.INT;
case 64 -> TypeInfoFactory.BIGINT;
default -> TypeInfoFactory.UNKNOWN;
};
}
case Time -> {
return TypeInfoFactory.TIMESTAMP;
}
case Date -> {
return TypeInfoFactory.DATE;
}
case Bool -> {
return TypeInfoFactory.BOOLEAN;
}
default -> {
log.warn("Not implemented type: {}", arrowTypeID);
return TypeInfoFactory.UNKNOWN;
Expand Down
Loading

0 comments on commit 3b8bcc3

Please sign in to comment.