Skip to content

Commit 7829ff7

Browse files
committed
added DuckDB + (embedded) Postgres + Arrow + DF test. Required temp fix for #1256 and #1257
1 parent f5707a8 commit 7829ff7

File tree

5 files changed

+582
-9
lines changed

5 files changed

+582
-9
lines changed

dataframe-arrow/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ dependencies {
3030
}
3131
testImplementation(libs.arrow.c.data)
3232
testImplementation(libs.duckdb.jdbc)
33+
34+
testImplementation(libs.arrow.driver.jdbc)
35+
testImplementation(libs.h2db)
36+
testImplementation(libs.embedded.postgresql)
3337
}
3438

3539
kotlinPublications {

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ import org.apache.arrow.vector.TimeMicroVector
2525
import org.apache.arrow.vector.TimeMilliVector
2626
import org.apache.arrow.vector.TimeNanoVector
2727
import org.apache.arrow.vector.TimeSecVector
28+
import org.apache.arrow.vector.TimeStampMicroTZVector
2829
import org.apache.arrow.vector.TimeStampMicroVector
30+
import org.apache.arrow.vector.TimeStampMilliTZVector
2931
import org.apache.arrow.vector.TimeStampMilliVector
32+
import org.apache.arrow.vector.TimeStampNanoTZVector
3033
import org.apache.arrow.vector.TimeStampNanoVector
34+
import org.apache.arrow.vector.TimeStampSecTZVector
3135
import org.apache.arrow.vector.TimeStampSecVector
3236
import org.apache.arrow.vector.TinyIntVector
3337
import org.apache.arrow.vector.UInt1Vector
@@ -39,12 +43,16 @@ import org.apache.arrow.vector.VarCharVector
3943
import org.apache.arrow.vector.VectorSchemaRoot
4044
import org.apache.arrow.vector.ViewVarBinaryVector
4145
import org.apache.arrow.vector.ViewVarCharVector
46+
import org.apache.arrow.vector.complex.ListVector
4247
import org.apache.arrow.vector.complex.StructVector
4348
import org.apache.arrow.vector.ipc.ArrowFileReader
4449
import org.apache.arrow.vector.ipc.ArrowReader
4550
import org.apache.arrow.vector.ipc.ArrowStreamReader
4651
import org.apache.arrow.vector.types.pojo.Field
4752
import org.apache.arrow.vector.util.DateUtility
53+
import org.apache.arrow.vector.util.DateUtility.getLocalDateTimeFromEpochMicro
54+
import org.apache.arrow.vector.util.DateUtility.getLocalDateTimeFromEpochMilli
55+
import org.apache.arrow.vector.util.DateUtility.getLocalDateTimeFromEpochNano
4856
import org.jetbrains.kotlinx.dataframe.AnyBaseCol
4957
import org.jetbrains.kotlinx.dataframe.AnyFrame
5058
import org.jetbrains.kotlinx.dataframe.DataColumn
@@ -63,6 +71,7 @@ import java.math.BigDecimal
6371
import java.math.BigInteger
6472
import java.nio.channels.ReadableByteChannel
6573
import java.nio.channels.SeekableByteChannel
74+
import java.util.concurrent.TimeUnit
6675
import kotlin.reflect.KType
6776
import kotlin.reflect.full.withNullability
6877
import kotlin.reflect.typeOf
@@ -197,11 +206,58 @@ private fun TimeStampSecVector.values(range: IntRange): List<LocalDateTime?> =
197206
}
198207
}
199208

209+
private fun TimeStampNanoTZVector.values(range: IntRange): List<LocalDateTime?> =
210+
range.mapIndexed { i, it ->
211+
if (isNull(i)) {
212+
null
213+
} else {
214+
getLocalDateTimeFromEpochNano(getObject(it), timeZone).toKotlinLocalDateTime()
215+
}
216+
}
217+
218+
private fun TimeStampMicroTZVector.values(range: IntRange): List<LocalDateTime?> =
219+
range.mapIndexed { i, it ->
220+
if (isNull(i)) {
221+
null
222+
} else {
223+
getLocalDateTimeFromEpochMicro(getObject(it), timeZone).toKotlinLocalDateTime()
224+
}
225+
}
226+
227+
private fun TimeStampMilliTZVector.values(range: IntRange): List<LocalDateTime?> =
228+
range.mapIndexed { i, it ->
229+
if (isNull(i)) {
230+
null
231+
} else {
232+
getLocalDateTimeFromEpochMilli(getObject(it), timeZone).toKotlinLocalDateTime()
233+
}
234+
}
235+
236+
private fun TimeStampSecTZVector.values(range: IntRange): List<LocalDateTime?> =
237+
range.mapIndexed { i, it ->
238+
if (isNull(i)) {
239+
null
240+
} else {
241+
val seconds = getObject(it)
242+
val millis = TimeUnit.SECONDS.toMillis(seconds)
243+
getLocalDateTimeFromEpochMilli(millis, timeZone).toKotlinLocalDateTime()
244+
}
245+
}
246+
200247
private fun StructVector.values(range: IntRange): List<Map<String, Any?>?> =
201248
range.map {
202249
getObject(it)
203250
}
204251

252+
private fun ListVector.values(range: IntRange): List<List<Any?>?> =
253+
range.map {
254+
if (isNull(it)) {
255+
null
256+
} else {
257+
getObject(it)
258+
}
259+
}
260+
205261
private fun NullVector.values(range: IntRange): List<Nothing?> =
206262
range.map {
207263
getObject(it) as Nothing?
@@ -287,7 +343,14 @@ private fun List<Nothing?>.withTypeNullable(
287343

288344
private fun readField(root: VectorSchemaRoot, field: Field, nullability: NullabilityOptions): AnyBaseCol {
289345
try {
290-
val range = 0 until root.rowCount
346+
val range = 0..<root.rowCount
347+
348+
// TODO
349+
// most types can be read directly from Arrow
350+
// some nested types need a recursive type map which we don't support yet
351+
// so we just rely on DataFrame runtime inference instead
352+
var infer = Infer.None
353+
291354
val (list, type) = when (val vector = root.getVector(field)) {
292355
is VarCharVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
293356

@@ -349,16 +412,29 @@ private fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullabi
349412

350413
is TimeStampSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
351414

352-
is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
415+
is TimeStampNanoTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
416+
417+
is TimeStampMicroTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
418+
419+
is TimeStampMilliTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
420+
421+
is TimeStampSecTZVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
353422

354423
is NullVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
355424

356-
else -> {
357-
throw NotImplementedError("reading from ${vector.javaClass.canonicalName} is not implemented")
358-
}
425+
is StructVector -> vector.values(range)
426+
.withTypeNullable(field.isNullable, nullability)
427+
.also { infer = Infer.Type }
428+
429+
is ListVector -> vector.values(range)
430+
.withTypeNullable(field.isNullable, nullability)
431+
.also { infer = Infer.Type }
432+
433+
else -> throw NotImplementedError("reading from ${vector.javaClass.canonicalName} is not implemented")
359434
}
360-
return DataColumn.createValueColumn(field.name, list, type, Infer.None)
361-
} catch (unexpectedNull: NullabilityException) {
435+
436+
return DataColumn.createValueColumn(name = field.name, values = list, type = type, infer = infer)
437+
} catch (_: NullabilityException) {
362438
throw IllegalArgumentException("Column `${field.name}` should be not nullable but has nulls")
363439
}
364440
}

0 commit comments

Comments
 (0)