Skip to content

Commit b03db2f

Browse files
Add write mapping
Co-authored-by: pratyakshsharma <[email protected]>
1 parent 1fc5687 commit b03db2f

File tree

29 files changed

+1121
-569
lines changed

29 files changed

+1121
-569
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ test-reports/
1818
.DS_Store
1919
.classpath
2020
.settings
21+
.java-version
2122
.project
2223
temp-testng-customsuite.xml
2324
test-output

presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/BaseJdbcClient.java

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
import com.facebook.presto.common.predicate.TupleDomain;
1818
import com.facebook.presto.common.type.CharType;
1919
import com.facebook.presto.common.type.DecimalType;
20+
import com.facebook.presto.common.type.TimestampType;
2021
import com.facebook.presto.common.type.Type;
2122
import com.facebook.presto.common.type.UuidType;
2223
import com.facebook.presto.common.type.VarcharType;
24+
import com.facebook.presto.plugin.jdbc.mapping.ColumnMapping;
25+
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
26+
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanWriteFunction;
27+
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleWriteFunction;
28+
import com.facebook.presto.plugin.jdbc.mapping.functions.LongWriteFunction;
29+
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceWriteFunction;
2330
import com.facebook.presto.spi.ColumnHandle;
2431
import com.facebook.presto.spi.ColumnMetadata;
2532
import com.facebook.presto.spi.ConnectorSession;
@@ -64,13 +71,32 @@
6471
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
6572
import static com.facebook.presto.common.type.TimeType.TIME;
6673
import static com.facebook.presto.common.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
67-
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
6874
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
6975
import static com.facebook.presto.common.type.TinyintType.TINYINT;
7076
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
7177
import static com.facebook.presto.common.type.Varchars.isVarcharType;
7278
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
73-
import static com.facebook.presto.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
79+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.bigintColumnMapping;
80+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.booleanColumnMapping;
81+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.charColumnMapping;
82+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.dateColumnMapping;
83+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.decimalColumnMapping;
84+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.doubleColumnMapping;
85+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.integerColumnMapping;
86+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.jdbcTypeToPrestoType;
87+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.realColumnMapping;
88+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.smallintColumnMapping;
89+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timeColumnMapping;
90+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timeWithTimeZoneColumnMapping;
91+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timestampColumnMapping;
92+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.timestampWithTimeZoneColumnMapping;
93+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.tinyintColumnMapping;
94+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.uuidColumnMapping;
95+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.varbinaryColumnMapping;
96+
import static com.facebook.presto.plugin.jdbc.mapping.StandardColumnMappings.varcharColumnMapping;
97+
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.booleanMapping;
98+
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.longMapping;
99+
import static com.facebook.presto.plugin.jdbc.mapping.WriteMapping.sliceMapping;
74100
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
75101
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
76102
import static com.google.common.base.MoreObjects.firstNonNull;
@@ -94,21 +120,21 @@ public class BaseJdbcClient
94120
{
95121
private static final Logger log = Logger.get(BaseJdbcClient.class);
96122

97-
private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
98-
.put(BOOLEAN, "boolean")
99-
.put(BIGINT, "bigint")
100-
.put(INTEGER, "integer")
101-
.put(SMALLINT, "smallint")
102-
.put(TINYINT, "tinyint")
103-
.put(DOUBLE, "double precision")
104-
.put(REAL, "real")
105-
.put(VARBINARY, "varbinary")
106-
.put(DATE, "date")
107-
.put(TIME, "time")
108-
.put(TIME_WITH_TIME_ZONE, "time with timezone")
109-
.put(TIMESTAMP, "timestamp")
110-
.put(TIMESTAMP_WITH_TIME_ZONE, "timestamp with timezone")
111-
.put(UuidType.UUID, "uuid")
123+
private static final Map<Type, WriteMapping> TYPE_MAPPINGS = ImmutableMap.<Type, WriteMapping>builder()
124+
.put(BOOLEAN, booleanMapping("boolean", (BooleanWriteFunction) booleanColumnMapping().getWriteFunction()))
125+
.put(BIGINT, longMapping("bigint", (LongWriteFunction) bigintColumnMapping().getWriteFunction()))
126+
.put(INTEGER, longMapping("integer", (LongWriteFunction) integerColumnMapping().getWriteFunction()))
127+
.put(SMALLINT, longMapping("smallint", (LongWriteFunction) smallintColumnMapping().getWriteFunction()))
128+
.put(TINYINT, longMapping("tinyint", (LongWriteFunction) tinyintColumnMapping().getWriteFunction()))
129+
.put(DOUBLE, WriteMapping.doubleMapping("double precision", (DoubleWriteFunction) doubleColumnMapping().getWriteFunction()))
130+
.put(REAL, longMapping("real", (LongWriteFunction) realColumnMapping().getWriteFunction()))
131+
.put(VARBINARY, sliceMapping("varbinary", (SliceWriteFunction) varbinaryColumnMapping().getWriteFunction()))
132+
.put(DATE, longMapping("date", (LongWriteFunction) dateColumnMapping().getWriteFunction()))
133+
.put(TIME, longMapping("time", (LongWriteFunction) timeColumnMapping().getWriteFunction()))
134+
.put(UuidType.UUID, sliceMapping("uuid", (SliceWriteFunction) uuidColumnMapping().getWriteFunction()))
135+
136+
.put(TIME_WITH_TIME_ZONE, longMapping("time with timezone", (LongWriteFunction) timeWithTimeZoneColumnMapping().getWriteFunction()))
137+
.put(TIMESTAMP_WITH_TIME_ZONE, longMapping("timestamp with timezone", (LongWriteFunction) timestampWithTimeZoneColumnMapping().getWriteFunction()))
112138
.build();
113139

114140
protected final String connectorId;
@@ -241,7 +267,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
241267
resultSet.getString("TYPE_NAME"),
242268
resultSet.getInt("COLUMN_SIZE"),
243269
resultSet.getInt("DECIMAL_DIGITS"));
244-
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
270+
Optional<ColumnMapping> columnMapping = toPrestoType(session, typeHandle);
245271
// skip unsupported column types
246272
if (columnMapping.isPresent()) {
247273
String columnName = resultSet.getString("COLUMN_NAME");
@@ -267,7 +293,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
267293
}
268294

269295
@Override
270-
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
296+
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
271297
{
272298
return jdbcTypeToPrestoType(typeHandle);
273299
}
@@ -403,7 +429,7 @@ private String getColumnString(ColumnMetadata column, String columnName)
403429
StringBuilder sb = new StringBuilder()
404430
.append(quoted(columnName))
405431
.append(" ")
406-
.append(toSqlType(column.getType()));
432+
.append(toWriteMapping(column.getType()).getDataType());
407433
if (!column.isNullable()) {
408434
sb.append(" NOT NULL");
409435
}
@@ -737,28 +763,45 @@ protected void execute(Connection connection, String query)
737763
}
738764
}
739765

740-
protected String toSqlType(Type type)
766+
public WriteMapping toWriteMapping(Type type)
741767
{
768+
String dataType;
742769
if (isVarcharType(type)) {
743770
VarcharType varcharType = (VarcharType) type;
744771
if (varcharType.isUnbounded()) {
745-
return "varchar";
772+
dataType = "varchar";
746773
}
747-
return "varchar(" + varcharType.getLengthSafe() + ")";
774+
else {
775+
dataType = "varchar(" + varcharType.getLengthSafe() + ")";
776+
}
777+
return sliceMapping(dataType, (SliceWriteFunction) varcharColumnMapping(varcharType).getWriteFunction());
778+
}
779+
else if (type instanceof CharType) {
780+
CharType charType = (CharType) type;
781+
if (charType.getLength() == CharType.MAX_LENGTH) {
782+
dataType = "char";
783+
}
784+
else {
785+
dataType = "char(" + ((CharType) type).getLength() + ")";
786+
}
787+
return sliceMapping(dataType, (SliceWriteFunction) charColumnMapping(charType).getWriteFunction());
748788
}
749-
if (type instanceof CharType) {
750-
if (((CharType) type).getLength() == CharType.MAX_LENGTH) {
751-
return "char";
789+
else if (type instanceof DecimalType) {
790+
DecimalType decimalType = (DecimalType) type;
791+
dataType = format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
792+
if (decimalType.isShort()) {
793+
return longMapping(dataType, (LongWriteFunction) decimalColumnMapping(decimalType).getWriteFunction());
794+
}
795+
else {
796+
return sliceMapping(dataType, (SliceWriteFunction) decimalColumnMapping(decimalType).getWriteFunction());
752797
}
753-
return "char(" + ((CharType) type).getLength() + ")";
754798
}
755-
if (type instanceof DecimalType) {
756-
return format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
799+
else if (type instanceof TimestampType) {
800+
return longMapping("timestamp", (LongWriteFunction) timestampColumnMapping((TimestampType) type).getWriteFunction());
757801
}
758-
759-
String sqlType = SQL_TYPES.get(type);
760-
if (sqlType != null) {
761-
return sqlType;
802+
WriteMapping writeMapping = TYPE_MAPPINGS.get(type);
803+
if (writeMapping != null) {
804+
return writeMapping;
762805
}
763806
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
764807
}

presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/JdbcClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
package com.facebook.presto.plugin.jdbc;
1515

1616
import com.facebook.presto.common.predicate.TupleDomain;
17+
import com.facebook.presto.common.type.Type;
18+
import com.facebook.presto.plugin.jdbc.mapping.ColumnMapping;
19+
import com.facebook.presto.plugin.jdbc.mapping.WriteMapping;
1720
import com.facebook.presto.spi.ColumnHandle;
1821
import com.facebook.presto.spi.ColumnMetadata;
1922
import com.facebook.presto.spi.ConnectorSession;
@@ -49,7 +52,9 @@ default boolean schemaExists(ConnectorSession session, JdbcIdentity identity, St
4952

5053
List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);
5154

52-
Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
55+
Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
56+
57+
WriteMapping toWriteMapping(Type type);
5358

5459
ConnectorSplitSource getSplits(ConnectorSession session, JdbcIdentity identity, JdbcTableLayoutHandle layoutHandle);
5560

presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/JdbcPageSink.java

Lines changed: 27 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,33 @@
1616
import com.facebook.airlift.log.Logger;
1717
import com.facebook.presto.common.Page;
1818
import com.facebook.presto.common.block.Block;
19-
import com.facebook.presto.common.type.DecimalType;
20-
import com.facebook.presto.common.type.TimestampType;
2119
import com.facebook.presto.common.type.Type;
22-
import com.facebook.presto.common.type.UuidType;
20+
import com.facebook.presto.plugin.jdbc.mapping.WriteFunction;
21+
import com.facebook.presto.plugin.jdbc.mapping.functions.BooleanWriteFunction;
22+
import com.facebook.presto.plugin.jdbc.mapping.functions.DoubleWriteFunction;
23+
import com.facebook.presto.plugin.jdbc.mapping.functions.LongWriteFunction;
24+
import com.facebook.presto.plugin.jdbc.mapping.functions.ObjectWriteFunction;
25+
import com.facebook.presto.plugin.jdbc.mapping.functions.SliceWriteFunction;
2326
import com.facebook.presto.spi.ConnectorPageSink;
2427
import com.facebook.presto.spi.ConnectorSession;
2528
import com.facebook.presto.spi.PrestoException;
2629
import com.google.common.collect.ImmutableList;
27-
import com.google.common.primitives.Shorts;
28-
import com.google.common.primitives.SignedBytes;
2930
import io.airlift.slice.Slice;
30-
import org.joda.time.DateTimeZone;
3131

3232
import java.sql.Connection;
33-
import java.sql.Date;
3433
import java.sql.PreparedStatement;
3534
import java.sql.SQLException;
3635
import java.sql.SQLNonTransientException;
37-
import java.sql.Timestamp;
38-
import java.time.Instant;
3936
import java.util.Collection;
4037
import java.util.List;
4138
import java.util.concurrent.CompletableFuture;
4239

43-
import static com.facebook.presto.common.type.BigintType.BIGINT;
44-
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
45-
import static com.facebook.presto.common.type.Chars.isCharType;
46-
import static com.facebook.presto.common.type.DateType.DATE;
47-
import static com.facebook.presto.common.type.Decimals.readBigDecimal;
48-
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
49-
import static com.facebook.presto.common.type.IntegerType.INTEGER;
50-
import static com.facebook.presto.common.type.RealType.REAL;
51-
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
52-
import static com.facebook.presto.common.type.TinyintType.TINYINT;
53-
import static com.facebook.presto.common.type.UuidType.prestoUuidToJavaUuid;
54-
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
55-
import static com.facebook.presto.common.type.Varchars.isVarcharType;
5640
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
5741
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
58-
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
59-
import static java.lang.Float.intBitsToFloat;
60-
import static java.lang.Math.toIntExact;
42+
import static com.google.common.base.Verify.verify;
43+
import static com.google.common.collect.ImmutableList.toImmutableList;
44+
import static java.lang.String.format;
6145
import static java.util.concurrent.CompletableFuture.completedFuture;
62-
import static java.util.concurrent.TimeUnit.DAYS;
63-
import static org.joda.time.chrono.ISOChronology.getInstanceUTC;
6446

6547
public class JdbcPageSink
6648
implements ConnectorPageSink
@@ -71,6 +53,7 @@ public class JdbcPageSink
7153
private final PreparedStatement statement;
7254

7355
private final List<Type> columnTypes;
56+
private final List<WriteFunction> columnWriters;
7457
private int batchSize;
7558

7659
public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient)
@@ -92,6 +75,12 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
9275
}
9376

9477
columnTypes = handle.getColumnTypes();
78+
columnWriters = columnTypes.stream().map(type -> {
79+
WriteFunction writeFunction = jdbcClient.toWriteMapping(type).getWriteFunction();
80+
verify(type.getJavaType() == writeFunction.getJavaType(),
81+
format("Presto type %s is not compatible with write function %s accepting %s", type, writeFunction, writeFunction.getJavaType()));
82+
return writeFunction;
83+
}).collect(toImmutableList());
9584
}
9685

9786
@Override
@@ -132,55 +121,22 @@ private void appendColumn(Page page, int position, int channel)
132121
}
133122

134123
Type type = columnTypes.get(channel);
135-
if (BOOLEAN.equals(type)) {
136-
statement.setBoolean(parameter, type.getBoolean(block, position));
124+
Class<?> javaType = type.getJavaType();
125+
WriteFunction writeFunction = columnWriters.get(channel);
126+
if (javaType == boolean.class) {
127+
((BooleanWriteFunction) writeFunction).set(statement, parameter, type.getBoolean(block, position));
137128
}
138-
else if (BIGINT.equals(type)) {
139-
statement.setLong(parameter, type.getLong(block, position));
129+
else if (javaType == long.class) {
130+
((LongWriteFunction) writeFunction).set(statement, parameter, type.getLong(block, position));
140131
}
141-
else if (INTEGER.equals(type)) {
142-
statement.setInt(parameter, toIntExact(type.getLong(block, position)));
132+
else if (javaType == double.class) {
133+
((DoubleWriteFunction) writeFunction).set(statement, parameter, type.getDouble(block, position));
143134
}
144-
else if (SMALLINT.equals(type)) {
145-
statement.setShort(parameter, Shorts.checkedCast(type.getLong(block, position)));
146-
}
147-
else if (TINYINT.equals(type)) {
148-
statement.setByte(parameter, SignedBytes.checkedCast(type.getLong(block, position)));
149-
}
150-
else if (DOUBLE.equals(type)) {
151-
statement.setDouble(parameter, type.getDouble(block, position));
152-
}
153-
else if (REAL.equals(type)) {
154-
statement.setFloat(parameter, intBitsToFloat(toIntExact(type.getLong(block, position))));
155-
}
156-
else if (type instanceof DecimalType) {
157-
statement.setBigDecimal(parameter, readBigDecimal((DecimalType) type, block, position));
158-
}
159-
else if (isVarcharType(type) || isCharType(type)) {
160-
statement.setString(parameter, type.getSlice(block, position).toStringUtf8());
161-
}
162-
else if (VARBINARY.equals(type)) {
163-
statement.setBytes(parameter, type.getSlice(block, position).getBytes());
164-
}
165-
else if (DATE.equals(type)) {
166-
// convert to midnight in default time zone
167-
long utcMillis = DAYS.toMillis(type.getLong(block, position));
168-
long localMillis = getInstanceUTC().getZone().getMillisKeepLocal(DateTimeZone.getDefault(), utcMillis);
169-
statement.setDate(parameter, new Date(localMillis));
170-
}
171-
else if (type instanceof TimestampType) {
172-
long timestampValue = type.getLong(block, position);
173-
statement.setTimestamp(parameter,
174-
Timestamp.from(Instant.ofEpochSecond(
175-
((TimestampType) type).getEpochSecond(timestampValue),
176-
((TimestampType) type).getNanos(timestampValue))));
177-
}
178-
else if (UuidType.UUID.equals(type)) {
179-
Slice slice = type.getSlice(block, position);
180-
statement.setObject(parameter, prestoUuidToJavaUuid(slice));
135+
else if (javaType == Slice.class) {
136+
((SliceWriteFunction) writeFunction).set(statement, parameter, type.getSlice(block, position));
181137
}
182138
else {
183-
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
139+
((ObjectWriteFunction) writeFunction).set(statement, parameter, type.getObject(block, position));
184140
}
185141
}
186142

0 commit comments

Comments
 (0)