Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a fix for generating the target partition path if base and part… #213

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@


## [16.3.4] - 2021-02-16
### Fixed
* Issue where replication location could not be generated. See [#212](https://github.com/HotelsDotCom/circus-train/issues/212).

## [16.3.3] - 2020-12-10
### Fixed
* Issue where rename table operation would be incorrect if tables are in different databases.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,7 +35,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -175,7 +177,8 @@ public void updateMetadata(
List<Partition> partitionsToAlter = new ArrayList<>(sourcePartitions.size());
List<ColumnStatistics> statisticsToSet = new ArrayList<>(sourcePartitions.size());
for (Partition sourcePartition : sourcePartitions) {
Path replicaPartitionLocation = locationManager.getPartitionLocation(sourcePartition);
Path replicaPartitionLocation = createReplicaPartitionLocation(sourceTableAndStatistics, sourcePartition,
locationManager);
LOG.debug("Generated replica partition path: {}", replicaPartitionLocation);

Partition replicaPartition = tableFactory
@@ -268,6 +271,28 @@ public void updateMetadata(
}
}

private Path createReplicaPartitionLocation(
TableAndStatistics sourceTableAndStatistics,
Partition sourcePartition,
ReplicaLocationManager locationManager) {
try {
return locationManager.getPartitionLocation(sourcePartition);
} catch (CircusTrainException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is thrown when base location doesn't match partition location, strangely enough the actual data copy doesn't have a problem with this so this logic fixes it by generating the partition name (instead of taking the name from the source partition folder).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm actually a bit concerned this might not work correctly with how this is implemented: https://github.com/HotelsDotCom/circus-train/blob/main/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/source/HdfsSnapshotLocationManager.java#L124

I need to find out how the different copiers generate the target paths.

try {
String partitionName = Warehouse
.makePartName(sourceTableAndStatistics.getTable().getPartitionKeys(), sourcePartition.getValues());
Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), partitionName);
LOG
.warn(
"Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}",
replicationPath, e.getMessage());
return replicationPath;
} catch (MetaException e1) {
throw new CircusTrainException(e1);
}
}
}

private List<Partition> getOldPartitions(
PartitionsAndStatistics sourcePartitionsAndStatistics,
String replicaDatabaseName,
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.DATABASE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.EVOLUTION_COLUMN;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PARTITIONED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PART_00000;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_ENCODED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_PARTITIONED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_UNPARTITIONED_TABLE;
@@ -259,6 +260,66 @@ public void checkAssertion() throws Exception {
runner.run(config.getAbsolutePath());
}

@Test
public void partitionedTableDifferentBasePaths() throws Exception {
// base path .../base1/
Table hiveTable = TestUtils
.createPartitionedTable(sourceCatalog.client(), DATABASE, PARTITIONED_TABLE,
new URI(sourceWarehouseUri + "/base1/"));

// base path .../base1/
URI partitionEurope = URI.create(sourceWarehouseUri + "/base1/continent=Europe");
URI partitionUk = URI.create(partitionEurope + "/country=UK");
File dataFileUk = new File(partitionUk.getPath(), PART_00000);
FileUtils.writeStringToFile(dataFileUk, "1\tadam\tlondon\n2\tsusan\tglasgow\n");

// base path .../base2/
URI partitionAsia = URI.create(sourceWarehouseUri + "/base2/continent=Asia");
URI partitionChina = URI.create(partitionAsia + "/country=China");
File dataFileChina = new File(partitionChina.getPath(), PART_00000);
FileUtils.writeStringToFile(dataFileChina, "1\tchun\tbeijing\n2\tlee\tshanghai\n");
LOG
.info(">>>> Partitions added: {}",
sourceCatalog
.client()
.add_partitions(Arrays
.asList(newTablePartition(hiveTable, Arrays.asList("Europe", "UK"), partitionUk),
newTablePartition(hiveTable, Arrays.asList("Asia", "China"), partitionChina))));
LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

exit.expectSystemExitWithStatus(0);
File config = dataFolder.getFile("partitioned-single-table-no-housekeeping.yml");
CircusTrainRunner runner = CircusTrainRunner
.builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation)
.sourceMetaStore(sourceCatalog.getThriftConnectionUri(), sourceCatalog.connectionURL(),
sourceCatalog.driverClassName())
.replicaMetaStore(replicaCatalog.getThriftConnectionUri())
.build();
exit.checkAssertionAfterwards(new Assertion() {
@Override
public void checkAssertion() throws Exception {
Table table = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
String warehouseBase = replicaWarehouseUri.toURI().toString();
assertTrue(table.getSd().getLocation().matches(warehouseBase + "ct_database/ct_table_p"));
List<Partition> listPartitions = replicaCatalog
.client()
.listPartitions(DATABASE, PARTITIONED_TABLE, (short) -1);
assertThat(listPartitions.size(), is(2));
assertTrue(listPartitions
.get(0)
.getSd()
.getLocation()
.matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Asia/country=China"));
assertTrue(listPartitions
.get(1)
.getSd()
.getLocation()
.matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Europe/country=UK"));
}
});
runner.run(config.getAbsolutePath());
}

@Test
public void partitionedTableNoHousekeepingWithTableReplicationParameters() throws Exception {
helper.createPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE));
@@ -1427,21 +1488,18 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
structData.put("name", "adam");
structData.put("city", "blackpool");

Table replicaTable = replicaHelper.createParquetPartitionedTable(
toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE),
DATABASE,
PARTITIONED_TABLE,
schema,
EVOLUTION_COLUMN,
structData,
1);
Table replicaTable = replicaHelper
.createParquetPartitionedTable(toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE,
PARTITIONED_TABLE, schema, EVOLUTION_COLUMN, structData, 1);
LOG.info(">>>> Table {} ", replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

replicaTable.getParameters().put("com.hotels.bdp.circustrain.replication.event", "event_id");
replicaCatalog.client().alter_table(DATABASE, PARTITIONED_TABLE, replicaTable);

// Create the source table with an additional column in the struct.
helper.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, structData);
helper
.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN,
structData);

Schema schemaV2 = SchemaBuilder
.builder("name.space")
@@ -1450,7 +1508,7 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
.requiredInt("id")
.name(EVOLUTION_COLUMN)
.type()
.record( EVOLUTION_COLUMN + "_struct")
.record(EVOLUTION_COLUMN + "_struct")
.fields()
.requiredString("name")
.requiredString("city")
@@ -1464,21 +1522,14 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
structData.put("city", "blackpool");
structData.put("dob", "22/09/1992");

Table table = helper.createParquetPartitionedTable(
toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE),
DATABASE,
PARTITIONED_TABLE,
schemaV2,
EVOLUTION_COLUMN,
structData,
2);
Table table = helper
.createParquetPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE,
PARTITIONED_TABLE, schemaV2, EVOLUTION_COLUMN, structData, 2);
LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

// Create the source partition with the original struct.
URI partition = URI.create(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE) + "/hour=" + 1);
sourceCatalog.client().add_partitions(Arrays.asList(
newTablePartition(table, Arrays.asList("1"), partition)
));
sourceCatalog.client().add_partitions(Arrays.asList(newTablePartition(table, Arrays.asList("1"), partition)));

CircusTrainRunner runner = CircusTrainRunner
.builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation)
@@ -1496,7 +1547,8 @@ public void checkAssertion() throws Exception {
Table sourceTable = sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
List<FieldSchema> cols = sourceTable.getSd().getCols();
assertThat(cols.get(0), is(new FieldSchema("id", "int", "")));
assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
assertThat(cols.get(1),
is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
PartitionIterator partitionIterator = new PartitionIterator(sourceCatalog.client(), sourceTable, (short) 1000);
List<Partition> partitions = new ArrayList<>();
while (partitionIterator.hasNext()) {
@@ -1510,7 +1562,8 @@ public void checkAssertion() throws Exception {
Table replicaTable = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
cols = replicaTable.getSd().getCols();
assertThat(cols.get(0), is(new FieldSchema("id", "int", "")));
assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
assertThat(cols.get(1),
is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
partitionIterator = new PartitionIterator(replicaCatalog.client(), replicaTable, (short) 1000);
partitions = new ArrayList<>();
while (partitionIterator.hasNext()) {
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -70,8 +70,7 @@ public class IntegrationTestHelper {
}

void createPartitionedTable(URI sourceTableUri) throws Exception {
Table hiveTable = TestUtils
.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri);
Table hiveTable = TestUtils.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri);

URI partitionEurope = URI.create(sourceTableUri + "/continent=Europe");
URI partitionUk = URI.create(partitionEurope + "/country=UK");
@@ -91,38 +90,34 @@ void createPartitionedTable(URI sourceTableUri) throws Exception {
}

Table createParquetPartitionedTable(
URI tableUri,
String database,
String table,
Schema schema,
String fieldName,
Object fieldData,
int version) throws Exception {
URI tableUri,
String database,
String table,
Schema schema,
String fieldName,
Object fieldData,
int version)
throws Exception {
List<FieldSchema> columns = new ArrayList<>();
AvroObjectInspectorGenerator schemaInspector = new AvroObjectInspectorGenerator(schema);
for (int i = 0; i < schemaInspector.getColumnNames().size(); i++) {
columns.add(new FieldSchema(
schemaInspector.getColumnNames().get(i), schemaInspector.getColumnTypes().get(i).toString(), ""
));
columns
.add(new FieldSchema(schemaInspector.getColumnNames().get(i),
schemaInspector.getColumnTypes().get(i).toString(), ""));
}
List<FieldSchema> partitionKeys = Arrays.asList(new FieldSchema("hour", "string", ""));
Table parquetTable = TestUtils
.createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys,
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName());
.createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys,
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName());
URI partition = createData(tableUri, schema, Integer.toString(version), version, fieldName, fieldData);
metaStoreClient.add_partitions(Arrays.asList(newTablePartition(parquetTable,
Arrays.asList(Integer.toString(version)), partition)));
metaStoreClient
.add_partitions(
Arrays.asList(newTablePartition(parquetTable, Arrays.asList(Integer.toString(version)), partition)));
return metaStoreClient.getTable(database, table);
}

URI createData(
URI tableUri,
Schema schema,
String hour,
int id,
String fieldName,
Object data) throws IOException {
URI createData(URI tableUri, Schema schema, String hour, int id, String fieldName, Object data) throws IOException {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", id);

@@ -144,10 +139,8 @@ URI createData(
parentFolder.mkdirs();
File partitionFile = new File(parentFolder, "parquet0000");
Path filePath = new Path(partitionFile.toURI());
ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(filePath)
.withSchema(schema)
.withConf(new Configuration())
.build();
ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData
.Record>builder(filePath).withSchema(schema).withConf(new Configuration()).build();

try {
writer.write(record);
@@ -201,8 +194,7 @@ void createManagedPartitionedTable(URI sourceTableUri) throws Exception {
}

void createPartitionedView() throws Exception {
Table view = TestUtils
.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE);
Table view = TestUtils.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE);
metaStoreClient
.add_partitions(Arrays
.asList(newViewPartition(view, Arrays.asList("Europe", "UK")),