Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repo-sync-2024-10-11T15:57:26+0800 #11

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,3 @@ workflows:
.bazelrc sdk-build-and-run true
WORKSPACE sdk-build-and-run true
- lint

Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ public enum DataproxyErrorCode {

// odps 异常
ODPS_CREATE_TABLE_FAILED(ErrorLevels.ERROR, ErrorTypes.BIZ, "600", "Create ODPS table failed"),
ODPS_ERROR(ErrorLevels.ERROR, ErrorTypes.BIZ, "601", "ODPS error"),
ODPS_CREATE_PARTITION_FAILED(ErrorLevels.ERROR, ErrorTypes.BIZ, "601", "Create ODPS table failed"),
ODPS_ERROR(ErrorLevels.ERROR, ErrorTypes.BIZ, "602", "ODPS error"),
ODPS_TABLE_ALREADY_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "603", "odps table already exists"),
ODPS_TABLE_NOT_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "604", "odps table not exists"),
ODPS_PARTITION_ALREADY_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "605", "odps partition already exists"),
ODPS_PARTITION_NOT_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "606", "odps partition not exists"),
ODPS_TABLE_NOT_EMPTY(ErrorLevels.ERROR, ErrorTypes.BIZ, "607", "odps table not empty"),
ODPS_TABLE_NOT_SUPPORT_PARTITION(ErrorLevels.ERROR, ErrorTypes.BIZ, "608", "odps table not support partition"),


//============================= 第三方错误【900-999】==================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.secretflow.dataproxy.manager.connector.odps;

import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.OdpsException;
import org.apache.arrow.memory.BufferAllocator;
import org.secretflow.dataproxy.common.model.InferSchemaResult;
import org.secretflow.dataproxy.common.model.command.DatasetReadCommand;
Expand Down Expand Up @@ -89,7 +89,7 @@ public DataWriter buildWriter(DatasetWriteCommand writeCommand) {
if (Objects.equals(DatasetFormatTypeEnum.TABLE, writeCommand.getFormatConfig().getType())) {
try {
return new OdpsDataWriter(config, locationConfig, writeCommand.getSchema());
} catch (TunnelException | IOException e) {
} catch (IOException | OdpsException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -67,12 +70,12 @@ public class OdpsDataWriter implements DataWriter {

private final boolean overwrite = true;

private boolean isTemporarilyCreatedTable = false;
private boolean isPartitioned = false;

private TableTunnel.UploadSession uploadSession = null;
private RecordWriter recordWriter = null;

public OdpsDataWriter(OdpsConnConfig connConfig, OdpsTableInfo tableInfo, Schema schema) throws TunnelException, IOException {
public OdpsDataWriter(OdpsConnConfig connConfig, OdpsTableInfo tableInfo, Schema schema) throws OdpsException, IOException {
this.connConfig = connConfig;
this.tableInfo = tableInfo;
this.schema = schema;
Expand All @@ -96,7 +99,8 @@ record = uploadSession.newRecord();

for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
log.debug("column: {}, type: {}", columnIndex, root.getFieldVectors().get(columnIndex).getField().getType());
columnName = root.getVector(columnIndex).getField().getName();
// odps column name is lower case
columnName = root.getVector(columnIndex).getField().getName().toLowerCase();

if (tableSchema.containsColumn(columnName)) {
this.setRecordValue(record, tableSchema.getColumnIndex(columnName), this.getValue(root.getFieldVectors().get(columnIndex), rowIndex));
Expand Down Expand Up @@ -144,21 +148,25 @@ private Odps initOdpsClient(OdpsConnConfig odpsConnConfig) {
return OdpsUtil.buildOdps(odpsConnConfig);
}

private void initOdps() throws TunnelException, IOException {
private void initOdps() throws OdpsException, IOException {
// init odps client
Odps odps = initOdpsClient(this.connConfig);
// Pre-processing
preProcessing(odps, connConfig.getProjectName(), tableInfo.tableName());
preProcessing(odps, connConfig.getProjectName(), tableInfo.tableName(), this.convertToPartitionSpec(tableInfo.partitionSpec()));
// init upload session
TableTunnel tunnel = new TableTunnel(odps);
if (tableInfo.partitionSpec() != null && !tableInfo.partitionSpec().isEmpty() && !isTemporarilyCreatedTable) {

if (isPartitioned) {
if (tableInfo.partitionSpec() == null || tableInfo.partitionSpec().isEmpty()) {
throw DataproxyException.of(DataproxyErrorCode.INVALID_PARTITION_SPEC, "partitionSpec is empty");
}
PartitionSpec partitionSpec = new PartitionSpec(tableInfo.partitionSpec());
uploadSession = tunnel.createUploadSession(connConfig.getProjectName(), tableInfo.tableName(), partitionSpec, overwrite);
} else {
uploadSession = tunnel.createUploadSession(connConfig.getProjectName(), tableInfo.tableName(), overwrite);
}

recordWriter = uploadSession.openRecordWriter(0);
recordWriter = uploadSession.openRecordWriter(0, true);
}

/**
Expand All @@ -173,6 +181,7 @@ private void initOdps() throws TunnelException, IOException {
private void setRecordValue(Record record, int columnIndex, Object value) {
if (value == null) {
record.set(columnIndex, null);
log.warn("table name: {} record set null value. index: {}", tableInfo.tableName(), columnIndex);
return;
}

Expand All @@ -184,8 +193,11 @@ private void setRecordValue(Record record, int columnIndex, Object value) {
case STRING -> record.setString(columnIndex, String.valueOf(value));
case FLOAT -> record.set(columnIndex, Float.parseFloat(String.valueOf(value)));
case DOUBLE -> record.set(columnIndex, Double.parseDouble(String.valueOf(value)));
case TINYINT -> record.set(columnIndex, Byte.parseByte(String.valueOf(value)));
case SMALLINT -> record.set(columnIndex, Short.parseShort(String.valueOf(value)));
case BIGINT -> record.set(columnIndex, Long.parseLong(String.valueOf(value)));
case INT -> record.set(columnIndex, Integer.parseInt(String.valueOf(value)));
case BOOLEAN -> record.setBoolean(columnIndex, (Boolean) value);
default -> record.set(columnIndex, value);
}
}
Expand All @@ -205,23 +217,32 @@ private Object getValue(FieldVector fieldVector, int index) {

switch (arrowTypeID) {
case Int -> {
if (fieldVector instanceof IntVector || fieldVector instanceof BigIntVector || fieldVector instanceof SmallIntVector) {
if (fieldVector instanceof IntVector || fieldVector instanceof BigIntVector || fieldVector instanceof SmallIntVector || fieldVector instanceof TinyIntVector) {
return fieldVector.getObject(index);
}
log.warn("Type INT is not IntVector or BigIntVector or SmallIntVector or TinyIntVector, value is: {}", fieldVector.getObject(index).toString());
}
case FloatingPoint -> {
if (fieldVector instanceof Float4Vector | fieldVector instanceof Float8Vector) {
return fieldVector.getObject(index);
}
log.warn("Type FloatingPoint is not Float4Vector or Float8Vector, value is: {}", fieldVector.getObject(index).toString());
}
case Utf8 -> {
if (fieldVector instanceof VarCharVector vector) {
return new String(vector.get(index), StandardCharsets.UTF_8);
}
log.warn("Type Utf8 is not VarCharVector, value is: {}", fieldVector.getObject(index).toString());
}
case Null -> {
return null;
}
case Bool -> {
if (fieldVector instanceof BitVector vector) {
return vector.get(index) == 1;
}
log.warn("Type BOOL is not BitVector, value is: {}", fieldVector.getObject(index).toString());
}
default -> {
log.warn("Not implemented type: {}, will use default function", arrowTypeID);
return fieldVector.getObject(index);
Expand All @@ -239,16 +260,35 @@ private Object getValue(FieldVector fieldVector, int index) {
* @param projectName project name
* @param tableName table name
*/
private void preProcessing(Odps odps, String projectName, String tableName) {
private void preProcessing(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) throws OdpsException {

if (!isExistsTable(odps, projectName, tableName)) {
boolean odpsTable = createOdpsTable(odps, projectName, tableName, schema);
boolean odpsTable = createOdpsTable(odps, projectName, tableName, schema, partitionSpec);
if (!odpsTable) {
throw DataproxyException.of(DataproxyErrorCode.ODPS_CREATE_TABLE_FAILED);
}
isTemporarilyCreatedTable = true;
log.info("odps table is not exists, create table successful, project: {}, table name: {}", projectName, tableName);
} else {
log.info("odps table is exists, project: {}, table name: {}", projectName, tableName);
}
isPartitioned = odps.tables().get(projectName, tableName).isPartitioned();

if (isPartitioned) {
if (partitionSpec == null || partitionSpec.isEmpty()) {
throw DataproxyException.of(DataproxyErrorCode.INVALID_PARTITION_SPEC, "partitionSpec is empty");
}

if (!isExistsPartition(odps, projectName, tableName, partitionSpec)) {
boolean odpsPartition = createOdpsPartition(odps, projectName, tableName, partitionSpec);
if (!odpsPartition) {
throw DataproxyException.of(DataproxyErrorCode.ODPS_CREATE_PARTITION_FAILED);
}
log.info("odps partition is not exists, create partition successful, project: {}, table name: {}, PartitionSpec: {}", projectName, tableName, partitionSpec);
} else {
log.info("odps partition is exists, project: {}, table name: {}, PartitionSpec: {}", projectName, tableName, partitionSpec);
}

}
log.info("odps table is exists or create table successful, project: {}, table name: {}", projectName, tableName);
}

/**
Expand All @@ -268,21 +308,72 @@ private boolean isExistsTable(Odps odps, String projectName, String tableName) {
return false;
}

private boolean createOdpsTable(Odps odps, String projectName, String tableName, Schema schema) {
private boolean isExistsPartition(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) throws OdpsException {
Table table = odps.tables().get(projectName, tableName);

if (table == null) {
log.warn("table is null, projectName:{}, tableName:{}", projectName, tableName);
throw DataproxyException.of(DataproxyErrorCode.ODPS_TABLE_NOT_EXISTS);
}

return table.hasPartition(partitionSpec);
}

/**
* create odps table
*
* @param odps odps client
* @param projectName project name
* @param tableName table name
* @param schema schema
* @param partitionSpec partition spec
* @return true or false
*/
private boolean createOdpsTable(Odps odps, String projectName, String tableName, Schema schema, PartitionSpec partitionSpec) {
try {
odps.tables().create(projectName, tableName, convertToTableSchema(schema), true);
TableSchema tableSchema = convertToTableSchema(schema);
if (partitionSpec != null) {
// Infer partitioning field type as string.
partitionSpec.keys().forEach(key -> tableSchema.addPartitionColumn(Column.newBuilder(key, TypeInfoFactory.STRING).build()));
}
odps.tables().create(projectName, tableName, tableSchema, "", true, null, OdpsUtil.getSqlFlag(), null);
return true;
} catch (Exception e) {
log.error("create odps table error, projectName:{}, tableName:{}", projectName, tableName, e);
}
return false;
}

private boolean createOdpsPartition(Odps odps, String projectName, String tableName, PartitionSpec partitionSpec) {
try {
Table table = odps.tables().get(projectName, tableName);
table.createPartition(partitionSpec, true);
return true;
} catch (Exception e) {
log.error("create odps partition error, projectName:{}, tableName:{}", projectName, tableName, e);
}
return false;
}

private TableSchema convertToTableSchema(Schema schema) {
List<Column> columns = schema.getFields().stream().map(this::convertToColumn).toList();
return TableSchema.builder().withColumns(columns).build();
}

/**
* convert partition spec
*
* @param partitionSpec partition spec
* @return partition spec
* @throws IllegalArgumentException if partitionSpec is invalid
*/
private PartitionSpec convertToPartitionSpec(String partitionSpec) {
if (partitionSpec == null || partitionSpec.isEmpty()) {
return null;
}
return new PartitionSpec(partitionSpec);
}

private Column convertToColumn(Field field) {
return Column.newBuilder(field.getName(), convertToType(field.getType())).build();
}
Expand All @@ -304,14 +395,23 @@ private TypeInfo convertToType(ArrowType type) {
};
}
case Int -> {
return TypeInfoFactory.INT;
return switch (((ArrowType.Int) type).getBitWidth()) {
case 8 -> TypeInfoFactory.TINYINT;
case 16 -> TypeInfoFactory.SMALLINT;
case 32 -> TypeInfoFactory.INT;
case 64 -> TypeInfoFactory.BIGINT;
default -> TypeInfoFactory.UNKNOWN;
};
}
case Time -> {
return TypeInfoFactory.TIMESTAMP;
}
case Date -> {
return TypeInfoFactory.DATE;
}
case Bool -> {
return TypeInfoFactory.BOOLEAN;
}
default -> {
log.warn("Not implemented type: {}", arrowTypeID);
return TypeInfoFactory.UNKNOWN;
Expand Down
Loading
Loading