16
16
import com .facebook .airlift .log .Logger ;
17
17
import com .facebook .presto .common .Page ;
18
18
import com .facebook .presto .common .block .Block ;
19
- import com .facebook .presto .common .type .DecimalType ;
20
- import com .facebook .presto .common .type .TimestampType ;
21
19
import com .facebook .presto .common .type .Type ;
22
- import com .facebook .presto .common .type .UuidType ;
20
+ import com .facebook .presto .plugin .jdbc .mapping .WriteFunction ;
21
+ import com .facebook .presto .plugin .jdbc .mapping .functions .BooleanWriteFunction ;
22
+ import com .facebook .presto .plugin .jdbc .mapping .functions .DoubleWriteFunction ;
23
+ import com .facebook .presto .plugin .jdbc .mapping .functions .LongWriteFunction ;
24
+ import com .facebook .presto .plugin .jdbc .mapping .functions .ObjectWriteFunction ;
25
+ import com .facebook .presto .plugin .jdbc .mapping .functions .SliceWriteFunction ;
23
26
import com .facebook .presto .spi .ConnectorPageSink ;
24
27
import com .facebook .presto .spi .ConnectorSession ;
25
28
import com .facebook .presto .spi .PrestoException ;
26
29
import com .google .common .collect .ImmutableList ;
27
- import com .google .common .primitives .Shorts ;
28
- import com .google .common .primitives .SignedBytes ;
29
30
import io .airlift .slice .Slice ;
30
- import org .joda .time .DateTimeZone ;
31
31
32
32
import java .sql .Connection ;
33
- import java .sql .Date ;
34
33
import java .sql .PreparedStatement ;
35
34
import java .sql .SQLException ;
36
35
import java .sql .SQLNonTransientException ;
37
- import java .sql .Timestamp ;
38
- import java .time .Instant ;
39
36
import java .util .Collection ;
40
37
import java .util .List ;
41
38
import java .util .concurrent .CompletableFuture ;
42
39
43
- import static com .facebook .presto .common .type .BigintType .BIGINT ;
44
- import static com .facebook .presto .common .type .BooleanType .BOOLEAN ;
45
- import static com .facebook .presto .common .type .Chars .isCharType ;
46
- import static com .facebook .presto .common .type .DateType .DATE ;
47
- import static com .facebook .presto .common .type .Decimals .readBigDecimal ;
48
- import static com .facebook .presto .common .type .DoubleType .DOUBLE ;
49
- import static com .facebook .presto .common .type .IntegerType .INTEGER ;
50
- import static com .facebook .presto .common .type .RealType .REAL ;
51
- import static com .facebook .presto .common .type .SmallintType .SMALLINT ;
52
- import static com .facebook .presto .common .type .TinyintType .TINYINT ;
53
- import static com .facebook .presto .common .type .UuidType .prestoUuidToJavaUuid ;
54
- import static com .facebook .presto .common .type .VarbinaryType .VARBINARY ;
55
- import static com .facebook .presto .common .type .Varchars .isVarcharType ;
56
40
import static com .facebook .presto .plugin .jdbc .JdbcErrorCode .JDBC_ERROR ;
57
41
import static com .facebook .presto .plugin .jdbc .JdbcErrorCode .JDBC_NON_TRANSIENT_ERROR ;
58
- import static com .facebook . presto . spi . StandardErrorCode . NOT_SUPPORTED ;
59
- import static java . lang . Float . intBitsToFloat ;
60
- import static java .lang .Math . toIntExact ;
42
+ import static com .google . common . base . Verify . verify ;
43
+ import static com . google . common . collect . ImmutableList . toImmutableList ;
44
+ import static java .lang .String . format ;
61
45
import static java .util .concurrent .CompletableFuture .completedFuture ;
62
- import static java .util .concurrent .TimeUnit .DAYS ;
63
- import static org .joda .time .chrono .ISOChronology .getInstanceUTC ;
64
46
65
47
public class JdbcPageSink
66
48
implements ConnectorPageSink
@@ -71,6 +53,7 @@ public class JdbcPageSink
71
53
private final PreparedStatement statement ;
72
54
73
55
private final List <Type > columnTypes ;
56
+ private final List <WriteFunction > columnWriters ;
74
57
private int batchSize ;
75
58
76
59
public JdbcPageSink (ConnectorSession session , JdbcOutputTableHandle handle , JdbcClient jdbcClient )
@@ -92,6 +75,12 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
92
75
}
93
76
94
77
columnTypes = handle .getColumnTypes ();
78
+ columnWriters = columnTypes .stream ().map (type -> {
79
+ WriteFunction writeFunction = jdbcClient .toWriteMapping (type ).getWriteFunction ();
80
+ verify (type .getJavaType () == writeFunction .getJavaType (),
81
+ format ("Presto type %s is not compatible with write function %s accepting %s" , type , writeFunction , writeFunction .getJavaType ()));
82
+ return writeFunction ;
83
+ }).collect (toImmutableList ());
95
84
}
96
85
97
86
@ Override
@@ -132,55 +121,22 @@ private void appendColumn(Page page, int position, int channel)
132
121
}
133
122
134
123
Type type = columnTypes .get (channel );
135
- if (BOOLEAN .equals (type )) {
136
- statement .setBoolean (parameter , type .getBoolean (block , position ));
124
+ Class <?> javaType = type .getJavaType ();
125
+ WriteFunction writeFunction = columnWriters .get (channel );
126
+ if (javaType == boolean .class ) {
127
+ ((BooleanWriteFunction ) writeFunction ).set (statement , parameter , type .getBoolean (block , position ));
137
128
}
138
- else if (BIGINT . equals ( type ) ) {
139
- statement . setLong ( parameter , type .getLong (block , position ));
129
+ else if (javaType == long . class ) {
130
+ (( LongWriteFunction ) writeFunction ). set ( statement , parameter , type .getLong (block , position ));
140
131
}
141
- else if (INTEGER . equals ( type ) ) {
142
- statement . setInt ( parameter , toIntExact ( type .getLong (block , position ) ));
132
+ else if (javaType == double . class ) {
133
+ (( DoubleWriteFunction ) writeFunction ). set ( statement , parameter , type .getDouble (block , position ));
143
134
}
144
- else if (SMALLINT .equals (type )) {
145
- statement .setShort (parameter , Shorts .checkedCast (type .getLong (block , position )));
146
- }
147
- else if (TINYINT .equals (type )) {
148
- statement .setByte (parameter , SignedBytes .checkedCast (type .getLong (block , position )));
149
- }
150
- else if (DOUBLE .equals (type )) {
151
- statement .setDouble (parameter , type .getDouble (block , position ));
152
- }
153
- else if (REAL .equals (type )) {
154
- statement .setFloat (parameter , intBitsToFloat (toIntExact (type .getLong (block , position ))));
155
- }
156
- else if (type instanceof DecimalType ) {
157
- statement .setBigDecimal (parameter , readBigDecimal ((DecimalType ) type , block , position ));
158
- }
159
- else if (isVarcharType (type ) || isCharType (type )) {
160
- statement .setString (parameter , type .getSlice (block , position ).toStringUtf8 ());
161
- }
162
- else if (VARBINARY .equals (type )) {
163
- statement .setBytes (parameter , type .getSlice (block , position ).getBytes ());
164
- }
165
- else if (DATE .equals (type )) {
166
- // convert to midnight in default time zone
167
- long utcMillis = DAYS .toMillis (type .getLong (block , position ));
168
- long localMillis = getInstanceUTC ().getZone ().getMillisKeepLocal (DateTimeZone .getDefault (), utcMillis );
169
- statement .setDate (parameter , new Date (localMillis ));
170
- }
171
- else if (type instanceof TimestampType ) {
172
- long timestampValue = type .getLong (block , position );
173
- statement .setTimestamp (parameter ,
174
- Timestamp .from (Instant .ofEpochSecond (
175
- ((TimestampType ) type ).getEpochSecond (timestampValue ),
176
- ((TimestampType ) type ).getNanos (timestampValue ))));
177
- }
178
- else if (UuidType .UUID .equals (type )) {
179
- Slice slice = type .getSlice (block , position );
180
- statement .setObject (parameter , prestoUuidToJavaUuid (slice ));
135
+ else if (javaType == Slice .class ) {
136
+ ((SliceWriteFunction ) writeFunction ).set (statement , parameter , type .getSlice (block , position ));
181
137
}
182
138
else {
183
- throw new PrestoException ( NOT_SUPPORTED , "Unsupported column type: " + type . getDisplayName ( ));
139
+ (( ObjectWriteFunction ) writeFunction ). set ( statement , parameter , type . getObject ( block , position ));
184
140
}
185
141
}
186
142
0 commit comments