Skip to content

Commit

Permalink
PLUGIN-296 Add integration test for SpannerSource getSchema from Impo…
Browse files Browse the repository at this point in the history
…rtQuery
  • Loading branch information
ardian4 committed Jan 8, 2021
1 parent 8ef64e0 commit a604f3e
Showing 1 changed file with 86 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -122,6 +121,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
Schema.Field.of("ARRAY_DATE_COL", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))))
);

private static final Schema IMPORT_SCHEMA = Schema.recordOf(
"schema",
Schema.Field.of("ID", Schema.nullableOf(Schema.of(Schema.Type.LONG))),
Schema.Field.of("StringCol", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("BoolCol", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))),
Schema.Field.of("TimestampCol", Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))),
Schema.Field.of("ArrayIntCol", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.Type.LONG)))),
Schema.Field.of("BytesCol", Schema.nullableOf(Schema.of(Schema.Type.BYTES))),
Schema.Field.of("DateCol", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))
);

private static final ZonedDateTime NOW = ZonedDateTime.now();
private static final Function<String, List<Mutation>> TEST_MUTATIONS = (tableName) -> ImmutableList.of(
Mutation.newInsertBuilder(tableName)
Expand Down Expand Up @@ -156,7 +166,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()),
Date.fromYearMonthDay(NOW.getYear() + 1, NOW.getMonthValue(), NOW.getDayOfMonth()),
null))
.build()
.build(),

Mutation.newInsertBuilder(tableName)
.set("ID").to(3)
.set("STRING_COL").to("some string")
.set("BOOL_COL").to(false)
.set("TIMESTAMP_COL").to(Timestamp.ofTimeSecondsAndNanos(NOW.toEpochSecond(), NOW.getNano()))
.set("ARRAY_INT_COL").toInt64Array(Arrays.asList(1L, 2L, null))
.set("BYTES_COL").to(ByteArray.copyFrom("some value".getBytes()))
.set("DATE_COL").to(Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()))
.build()
);

private static final List<Mutation> SOURCE_TABLE_TEST_MUTATIONS = TEST_MUTATIONS.apply(SOURCE_TABLE_NAME);
Expand Down Expand Up @@ -306,6 +326,70 @@ private void testReadAndStore(Engine engine) throws Exception {
Assert.assertTrue(resultSet.isNull("NOT_IN_THE_SCHEMA_COL"));
}

@Test
public void testReadWithImportQuery() throws Exception {
testReadWithImportQuery(Engine.MAPREDUCE);
testReadWithImportQuery(Engine.SPARK);
}

private void testReadWithImportQuery(Engine engine) throws Exception {
Map<String, String> sourceProperties = new ImmutableMap.Builder<String, String>()
.put("referenceName", "spanner_source")
.put("project", "${project}")
.put("instance", "${instance}")
.put("database", "${database}")
.put("table", "${srcTable}")
.put("importQuery","${importQuery}")
.build();

Map<String, String> sinkProperties = new ImmutableMap.Builder<String, String>()
.put("referenceName", "spanner_sink")
.put("project", "${project}")
.put("instance", "${instance}")
.put("database", "${database}")
.put("table", "${dstTable}")
.put("schema", IMPORT_SCHEMA.toString())
.put("keys", "${keys}")
.build();

String applicationName = SPANNER_PLUGIN_NAME + "-testReadWithImportQuery";
String nonExistentSinkTableName = "nonexistent_" + UUID.randomUUID().toString().replaceAll("-", "_");
ApplicationManager applicationManager = deployApplication(sourceProperties, sinkProperties,
applicationName, engine);
Map<String, String> args = new HashMap<>();
args.put("project", getProjectId());
args.put("instance", instance.getId().getInstance());
args.put("database", database.getId().getDatabase());
args.put("srcTable", SOURCE_TABLE_NAME);
args.put("dstTable", nonExistentSinkTableName);
args.put("keys", "ID");
args.put("importQuery","Select ID, STRING_COL as StringCol, BOOL_COL as BoolCol, TIMESTAMP_COL as TimestampCol, " +
"ARRAY_INT_COL as ArrayIntCol, BYTES_COL as BytesCol, DATE_COL as DateCol from " + SOURCE_TABLE_NAME);
startWorkFlow(applicationManager, ProgramRunStatus.COMPLETED, args);

ResultSet resultSet = spanner.getDatabaseClient(database.getId())
.singleUse()
.executeQuery(Statement.of(String.format("select * from %s;", nonExistentSinkTableName)));

Assert.assertTrue(resultSet.next());
Map<String, Value> firstRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(0).asMap();
Assert.assertEquals(firstRowExpected.get("ID").getInt64(), resultSet.getLong("ID"));

Assert.assertTrue(resultSet.next());
Assert.assertTrue(resultSet.next());
Map<String, Value> secondRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(2).asMap();
Assert.assertEquals(secondRowExpected.keySet().size(), resultSet.getColumnCount());
Assert.assertEquals(secondRowExpected.get("ID").getInt64(), resultSet.getLong("ID"));
Assert.assertEquals(secondRowExpected.get("STRING_COL").getString(), resultSet.getString("StringCol"));
Assert.assertEquals(secondRowExpected.get("BOOL_COL").getBool(), resultSet.getBoolean("BoolCol"));
Assert.assertEquals(secondRowExpected.get("TIMESTAMP_COL").getTimestamp(), resultSet.getTimestamp("TimestampCol"));
Assert.assertEquals(secondRowExpected.get("ARRAY_INT_COL").getInt64Array(), resultSet.getLongList("ArrayIntCol"));
Assert.assertEquals(secondRowExpected.get("BYTES_COL").getBytes(), resultSet.getBytes("BytesCol"));
Assert.assertEquals(secondRowExpected.get("DATE_COL").getDate(), resultSet.getDate("DateCol"));
spanner.getDatabaseClient(database.getId()).singleUse()
.executeQuery(Statement.of(String.format("drop table %s;", nonExistentSinkTableName)));
}

//TODO:(CDAP-16040) re-enable once plugin is fixed
//@Test
public void testReadAndStoreInNewTableWithNoSourceSchema() throws Exception {
Expand Down

0 comments on commit a604f3e

Please sign in to comment.