Skip to content

[Iceberg]Prefix table (loaded from Hive Catalog) with catalog name #25140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class IcebergHiveMetadata
{
public static final int MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE = 1000;

private final IcebergCatalogName catalogName;
private final ExtendedHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
Expand All @@ -162,6 +163,7 @@ public class IcebergHiveMetadata
private final ManifestFileCache manifestFileCache;

public IcebergHiveMetadata(
IcebergCatalogName catalogName,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
Expand All @@ -176,6 +178,7 @@ public IcebergHiveMetadata(
IcebergTableProperties tableProperties)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
Expand Down Expand Up @@ -204,7 +207,7 @@ public boolean schemaExists(ConnectorSession session, String schemaName)
@Override
protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, manifestFileCache, session, schemaTableName);
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, manifestFileCache, session, catalogName, schemaTableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class IcebergHiveMetadataFactory
implements IcebergMetadataFactory
{
final IcebergCatalogName catalogName;
final ExtendedHiveMetastore metastore;
final HdfsEnvironment hdfsEnvironment;
final TypeManager typeManager;
Expand All @@ -46,6 +47,7 @@ public class IcebergHiveMetadataFactory

@Inject
public IcebergHiveMetadataFactory(
IcebergCatalogName catalogName,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
Expand All @@ -59,6 +61,7 @@ public IcebergHiveMetadataFactory(
ManifestFileCache manifestFileCache,
IcebergTableProperties tableProperties)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -76,6 +79,7 @@ public IcebergHiveMetadataFactory(
public ConnectorMetadata create()
{
return new IcebergHiveMetadata(
catalogName,
metastore,
hdfsEnvironment,
typeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
Expand Down Expand Up @@ -101,7 +102,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -211,7 +211,6 @@

public final class IcebergUtil
{
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");
private static final Logger log = Logger.get(IcebergUtil.class);
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;

Expand Down Expand Up @@ -246,7 +245,7 @@ public static Table getShallowWrappedIcebergTable(Schema schema, PartitionSpec s
return new PrestoIcebergTableForMetricsConfig(schema, spec, properties, sortOrder);
}

public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ManifestFileCache manifestFileCache, ConnectorSession session, SchemaTableName table)
public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, IcebergHiveTableOperationsConfig config, ManifestFileCache manifestFileCache, ConnectorSession session, IcebergCatalogName catalogName, SchemaTableName table)
{
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
TableOperations operations = new HiveTableOperations(
Expand All @@ -258,7 +257,7 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv
manifestFileCache,
table.getSchemaName(),
table.getTableName());
return new BaseTable(operations, quotedTableName(table));
return new BaseTable(operations, fullTableName(catalogName.getCatalogName(), TableIdentifier.of(table.getSchemaName(), table.getTableName())));
}

public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
Expand Down Expand Up @@ -411,19 +410,6 @@ public static Optional<String> getViewComment(View view)
return Optional.ofNullable(view.properties().get(TABLE_COMMENT));
}

private static String quotedTableName(SchemaTableName name)
{
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
}

private static String quotedName(String name)
{
if (SIMPLE_NAME.matcher(name).matches()) {
return name;
}
return '"' + name.replace("\"", "\"\"") + '"';
}

public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable)
{
Expression expression = ExpressionConverter.toIcebergExpression(predicates);
Expand Down Expand Up @@ -1329,4 +1315,30 @@ public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?
{
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
}

// This code is copied from Iceberg
private static String fullTableName(String catalogName, TableIdentifier identifier)
{
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
}
else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,21 @@ public void testCreateTableWithCustomLocation()
}
}

@Test
protected void testCreateTableAndValidateIcebergTableName()
{
String tableName = "test_create_table_for_validate_name";
Session session = getSession();
assertUpdate(session, format("CREATE TABLE %s (col1 INTEGER, aDate DATE)", tableName));
Table icebergTable = loadTable(tableName);

String catalog = session.getCatalog().get();
String schemaName = session.getSchema().get();
assertEquals(icebergTable.name(), catalog + "." + schemaName + "." + tableName);

assertUpdate("DROP TABLE IF EXISTS " + tableName);
}

@Test
public void testPartitionedByTimeType()
{
Expand Down Expand Up @@ -2526,7 +2541,7 @@ private Table updateTable(String tableName)

protected Table loadTable(String tableName)
{
Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), "test-hive", getProperties(), new Configuration());
Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), new Configuration());
return catalog.loadTable(TableIdentifier.of("tpch", tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,15 @@ public void testTableVersionErrors()
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF id", ".* cannot be resolved");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF (SELECT CURRENT_TIMESTAMP)", ".* Constant expression cannot contain a subquery");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF NULL", "Table version AS OF/BEFORE expression cannot be NULL for .*");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP WITH TIME ZONE)", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP)", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP WITH TIME ZONE)", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS TIMESTAMP)", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CAST ('2023-01-01' AS DATE)", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF CURRENT_DATE", ".* Type date is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00.000'", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP AS OF TIMESTAMP '2023-01-01 00:00:00.000'", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");

assertQueryFails("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId1 + " ORDER BY 1", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab1\"");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table \"test_tt_schema\".\"test_table_version_tab2\"");
assertQueryFails("SELECT desc FROM " + tableName1 + " FOR VERSION BEFORE " + tab1VersionId1 + " ORDER BY 1", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab1");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE TIMESTAMP " + "'" + tab2Timestamp1 + "' - INTERVAL '1' MONTH", "No history found based on timestamp for table iceberg.test_tt_schema.test_table_version_tab2");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE 100", ".* Type integer is invalid. Supported table version AS OF/BEFORE expression type is BIGINT or VARCHAR");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR VERSION BEFORE " + tab2VersionId1 + " - " + tab2VersionId1, "Iceberg snapshot ID does not exists: 0");
assertQueryFails("SELECT desc FROM " + tableName2 + " FOR TIMESTAMP BEFORE 'bad'", ".* Type varchar\\(3\\) is invalid. Supported table version AS OF/BEFORE expression type is Timestamp or Timestamp with Time Zone.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.URI;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY;
import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY;
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
Expand Down Expand Up @@ -130,7 +131,7 @@ protected HdfsEnvironment getHdfsEnvironment()
protected Table loadTable(String tableName)
{
Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), getCatalogDataDirectory());
Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), "test-hive", getProperties(), configuration);
Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), configuration);
return catalog.loadTable(TableIdentifier.of("tpch", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergDistributedTestBase;
import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
Expand Down Expand Up @@ -191,6 +192,7 @@ protected Table loadTable(String tableName)
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024 * 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
new IcebergCatalogName(ICEBERG_CATALOG),
SchemaTableName.valueOf("tpch." + tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.CatalogType;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergMetadataColumn;
Expand Down Expand Up @@ -594,6 +595,7 @@ private Table loadTable(String tableName)
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
new IcebergCatalogName(ICEBERG_CATALOG),
SchemaTableName.valueOf("tpch." + tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.presto.FullConnectorSession;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
Expand Down Expand Up @@ -60,11 +62,13 @@ protected ExtendedHiveMetastore getFileHiveMetastore()
@Override
protected Table getIcebergTable(ConnectorSession session, String schema, String tableName)
{
String defaultCatalog = ((FullConnectorSession) session).getSession().getCatalog().get();
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024),
session,
new IcebergCatalogName(defaultCatalog),
SchemaTableName.valueOf(schema + "." + tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.hive.metastore.file.FileHiveMetastoreConfig;
import com.facebook.presto.hive.metastore.file.TableMetadata;
import com.facebook.presto.iceberg.CommitTaskData;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergHiveMetadataFactory;
Expand Down Expand Up @@ -405,6 +406,7 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore
{
HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment();
IcebergHiveMetadataFactory icebergHiveMetadataFactory = new IcebergHiveMetadataFactory(
new IcebergCatalogName("unimportant"),
metastore,
hdfsEnvironment,
FUNCTION_AND_TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.iceberg.HiveTableOperations;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.ManifestFileCache;
Expand Down Expand Up @@ -97,6 +98,7 @@ Table loadTable(String tableName)
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
new IcebergCatalogName(ICEBERG_CATALOG),
SchemaTableName.valueOf("tpch." + tableName));
}

Expand Down
Loading