From 998898ec64794dea94a20f41035dd95decdf27cd Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 28 Nov 2023 14:10:39 -0600 Subject: [PATCH] Allow broker to use catalog for datasource schemas --- extensions-core/druid-catalog/pom.xml | 112 +++------ .../catalog/guice/CatalogBrokerModule.java | 100 ++++++++ .../catalog/http/CatalogListenerResource.java | 18 ++ .../druid/catalog/http/CatalogResource.java | 3 +- .../catalog/sql/LiveCatalogResolver.java | 211 ++++++++++++++++ .../catalog/sync/CachedMetadataCatalog.java | 41 ++++ .../druid/catalog/sync/CatalogClient.java | 4 +- .../catalog/sync/CatalogUpdateListener.java | 2 + .../catalog/sync/CatalogUpdateNotifier.java | 16 +- .../catalog/sync/CatalogUpdateReceiver.java | 56 +++++ ...rg.apache.druid.initialization.DruidModule | 1 + .../druid/catalog/sql/CatalogQueryTest.java | 124 ++++++++++ .../druid/catalog/sql/LiveCatalogTest.java | 228 ++++++++++++++++++ .../catalog/storage/TableManagerTest.java | 4 +- .../druid/catalog/sync/CatalogCacheTest.java | 154 ++++++++++++ .../druid/catalog/sync/CatalogSyncTest.java | 28 +-- .../druid/catalog/sync/MockCatalogSync.java | 10 + .../http/catalog/CatalogResourceTest.java | 6 +- .../druid/server/http/catalog/EditorTest.java | 12 +- .../ingest/insertFromTable-logicalPlan.txt | 3 + ...sertWithCatalogClusteredBy-logicalPlan.txt | 4 + ...ertWithCatalogClusteredBy2-logicalPlan.txt | 4 + .../insertWithClusteredBy-logicalPlan.txt | 4 + .../insertWithClusteredBy2-logicalPlan.txt | 4 + .../apache/druid/msq/test/MSQTestBase.java | 3 +- .../model/table/S3InputSourceDefnTest.java | 32 +-- .../druid/catalog/model/CatalogUtils.java | 2 +- .../druid/catalog/model/ColumnSpec.java | 41 ++-- .../apache/druid/catalog/model/Columns.java | 95 +++++--- .../apache/druid/catalog/model/TableId.java | 5 + .../model/facade/DatasourceFacade.java | 83 +++++++ .../model/facade/ExternalTableFacade.java | 3 +- .../catalog/model/facade/TableFacade.java | 6 +- .../model/table/BaseInputSourceDefn.java | 2 +- .../catalog/model/table/DatasourceDefn.java | 12 +- .../model/table/ExternalTableDefn.java | 5 + .../model/table/FormattedInputSourceDefn.java | 4 +- .../catalog/model/table/TableBuilder.java | 2 +- .../model/table/BaseExternTableTest.java | 4 +- .../model/table/CsvInputFormatTest.java | 6 +- .../model/table/DatasourceTableTest.java | 109 ++++----- .../model/table/DelimitedInputFormatTest.java | 6 +- .../model/table/ExternalTableTest.java | 28 +-- .../model/table/HttpInputSourceDefnTest.java | 36 +-- .../table/InlineInputSourceDefnTest.java | 22 +- .../model/table/JsonInputFormatTest.java | 6 +- .../model/table/LocalInputSourceDefnTest.java | 32 +-- .../druid/sql/calcite/schema/DruidSchema.java | 10 +- .../sql/calcite/table/DatasourceTable.java | 129 +++++++++- .../sql/calcite/IngestTableFunctionTest.java | 4 +- .../sql/calcite/SqlTestFrameworkConfig.java | 1 + .../schema/DruidSchemaNoDataInitTest.java | 3 +- .../calcite/schema/InformationSchemaTest.java | 4 +- .../sql/calcite/schema/SystemSchemaTest.java | 3 +- .../sql/calcite/util/QueryFrameworkUtils.java | 18 +- .../sql/calcite/util/SqlTestFramework.java | 8 +- 56 files changed, 1514 insertions(+), 359 deletions(-) create mode 100644 extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java create mode 100644 extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java create mode 100644 extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java create mode 100644 extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java create mode 100644 extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java create mode 100644 extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java create mode 100644 extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt create mode 100644 extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt create mode 100644 extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt create mode 100644 extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt create mode 100644 extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt diff --git a/extensions-core/druid-catalog/pom.xml b/extensions-core/druid-catalog/pom.xml index a2eb434c1a21..e86142b55d44 100644 --- a/extensions-core/druid-catalog/pom.xml +++ b/extensions-core/druid-catalog/pom.xml @@ -48,44 +48,22 @@ ${project.parent.version} provided - - org.apache.druid - druid-indexing-service - ${project.parent.version} - provided - org.apache.druid druid-sql ${project.parent.version} provided - - org.apache.druid - druid-services - ${project.parent.version} - provided - com.google.inject guice provided - - com.google.inject.extensions - guice-multibindings - provided - com.google.guava guava provided - - com.opencsv - opencsv - provided - com.fasterxml.jackson.core jackson-databind @@ -142,11 +120,6 @@ curator-client provided - - com.fasterxml.jackson.dataformat - jackson-dataformat-smile - provided - org.jdbi jdbi @@ -162,11 +135,6 @@ jsr311-api provided - - org.apache.commons - commons-lang3 - provided - javax.servlet javax.servlet-api @@ -177,36 +145,6 @@ jersey-server provided - - com.google.errorprone - error_prone_annotations - provided - - - org.lz4 - lz4-java - provided - - - org.apache.datasketches - datasketches-java - provided - - - org.apache.datasketches - datasketches-memory - provided - - - it.unimi.dsi - fastutil-core - provided - - - commons-io - commons-io - provided - @@ -214,31 +152,11 @@ easymock test - - org.hamcrest - hamcrest-all - test - - - org.hamcrest - hamcrest-core - test - junit junit test - - org.mockito - mockito-core - test - - - nl.jqno.equalsverifier - equalsverifier - test - org.apache.druid druid-processing @@ -262,4 +180,34 @@ + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + javax.inject:javax.inject + + + javax.inject:javax.inject + jakarta.inject:jakarta.inject-api + + + + + diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java new file mode 100644 index 000000000000..91de7842e133 --- /dev/null +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/guice/CatalogBrokerModule.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.guice; + +import com.google.inject.Binder; +import org.apache.druid.catalog.http.CatalogListenerResource; +import org.apache.druid.catalog.model.SchemaRegistry; +import org.apache.druid.catalog.model.SchemaRegistryImpl; +import org.apache.druid.catalog.sql.LiveCatalogResolver; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.CatalogClient; +import org.apache.druid.catalog.sync.CatalogUpdateListener; +import org.apache.druid.catalog.sync.CatalogUpdateReceiver; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog.CatalogSource; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.Jerseys; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.LoadScope; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.sql.calcite.planner.CatalogResolver; + +/** + * Configures the metadata catalog on the Broker to use a cache + * and network communications for pull and push updates. + */ +@LoadScope(roles = NodeRole.BROKER_JSON_NAME) +public class CatalogBrokerModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + // The Broker (catalog client) uses a cached metadata catalog. + binder + .bind(CachedMetadataCatalog.class) + .in(LazySingleton.class); + + // Broker code accesses he catalog through the + // MetadataCatalog interface. + binder + .bind(MetadataCatalog.class) + .to(CachedMetadataCatalog.class) + .in(LazySingleton.class); + + // The cached metadata catalog needs a "pull" source, + // which is the network client. + binder + .bind(CatalogSource.class) + .to(CatalogClient.class) + .in(LazySingleton.class); + + // The cached metadata catalog is the listener for"push" events. + binder + .bind(CatalogUpdateListener.class) + .to(CachedMetadataCatalog.class) + .in(LazySingleton.class); + + // At present, the set of schemas is fixed. + binder + .bind(SchemaRegistry.class) + .to(SchemaRegistryImpl.class) + .in(LazySingleton.class); + + // Lifecycle-managed class to prime the metadata cache + binder + .bind(CatalogUpdateReceiver.class) + .in(ManageLifecycle.class); + LifecycleModule.register(binder, CatalogUpdateReceiver.class); + + // Catalog resolver for the planner. This will override the + // base binding. + binder + .bind(CatalogResolver.class) + .to(LiveCatalogResolver.class) + .in(LazySingleton.class); + + // The listener resource sends to the catalog + // listener (the cached catalog.) + Jerseys.addResource(binder, CatalogListenerResource.class); + } +} diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java index 67b4d29fbef1..c100fbbbad82 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java @@ -67,4 +67,22 @@ public Response syncTable(final UpdateEvent event) listener.updated(event); return Response.status(Response.Status.ACCEPTED).build(); } + + @POST + @Path("flush") + @ResourceFilters(ConfigResourceFilter.class) + public Response flush() + { + listener.flush(); + return Response.status(Response.Status.ACCEPTED).build(); + } + + @POST + @Path("resync") + @ResourceFilters(ConfigResourceFilter.class) + public Response resync() + { + listener.resync(); + return Response.status(Response.Status.ACCEPTED).build(); + } } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java index 551ba490c183..2590f57c8b38 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java @@ -278,8 +278,7 @@ public Response editTable( // Retrieval /** - * Retrieves the list of all Druid schema names, all table names, or - * all table metadata. + * Retrieves the list of all Druid schema names. * * @param format the format of the response. See the code for the * available formats diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java new file mode 100644 index 000000000000..1c5ec0cdec5b --- /dev/null +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sql/LiveCatalogResolver.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.catalog.model.ColumnSpec; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.ResolvedTable; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.catalog.model.table.DatasourceDefn; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveColumnMetadata; +import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveMetadata; +import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata; +import org.apache.druid.sql.calcite.table.DruidTable; + +import javax.inject.Inject; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class LiveCatalogResolver implements CatalogResolver +{ + private final MetadataCatalog catalog; + + @Inject + public LiveCatalogResolver(final MetadataCatalog catalog) + { + this.catalog = catalog; + } + + private DatasourceFacade datasourceSpec(String name) + { + TableId tableId = TableId.datasource(name); + ResolvedTable table = catalog.resolveTable(tableId); + if (table == null) { + return null; + } + if (!DatasourceDefn.isDatasource(table)) { + return null; + } + return new DatasourceFacade(table); + } + + /** + * Create a {@link DruidTable} based on the physical segments, catalog entry, or both. + */ + @Override + public DruidTable resolveDatasource(String name, PhysicalDatasourceMetadata dsMetadata) + { + DatasourceFacade dsSpec = datasourceSpec(name); + + // No catalog metadata. If there is no physical metadata, then the + // datasource does not exist. Else, if there is physical metadata, the + // datasource is based entirely on the physical information. + if (dsSpec == null) { + return dsMetadata == null ? null : new DatasourceTable(dsMetadata); + } + if (dsMetadata == null) { + // Datasource exists only in the catalog: no physical segments. + return emptyDatasource(name, dsSpec); + } else { + // Datasource exists as both segments and a catalog entry. + return mergeDatasource(dsMetadata, dsSpec); + } + } + + private DruidTable emptyDatasource(String name, DatasourceFacade dsSpec) + { + RowSignature.Builder builder = RowSignature.builder(); + Map columns = new HashMap<>(); + boolean hasTime = false; + for (ColumnSpec col : dsSpec.columns()) { + EffectiveColumnMetadata colMetadata = columnFromCatalog(col, null); + if (colMetadata.name().equals(Columns.TIME_COLUMN)) { + hasTime = true; + } + builder.add(col.name(), colMetadata.druidType()); + columns.put(col.name(), colMetadata); + } + if (!hasTime) { + columns.put(Columns.TIME_COLUMN, new EffectiveColumnMetadata( + Columns.TIME_COLUMN, + ColumnType.LONG + )); + builder = RowSignature.builder() + .add(Columns.TIME_COLUMN, ColumnType.LONG) + .addAll(builder.build()); + } + + final PhysicalDatasourceMetadata mergedMetadata = new PhysicalDatasourceMetadata( + new TableDataSource(name), + builder.build(), + false, // Cannot join to an empty table + false // Cannot broadcast an empty table + ); + return new DatasourceTable( + mergedMetadata.getRowSignature(), + mergedMetadata, + new EffectiveMetadata(dsSpec, columns, true) + ); + } + + private EffectiveColumnMetadata columnFromCatalog(ColumnSpec col, ColumnType physicalType) + { + ColumnType type = Columns.druidType(col); + if (type != null) { + // Use the type that the user provided. + } else if (physicalType == null) { + // Corner case: the user has defined a column in the catalog, has + // not specified a type (meaning the user wants Druid to decide), but + // there is no data at this moment. Guess String as the type for the + // null values. If new segments appear between now and execution, we'll + // convert the values to string, which is always safe. + type = ColumnType.STRING; + } else { + type = physicalType; + } + return new EffectiveColumnMetadata(col.name(), type); + } + + private DruidTable mergeDatasource( + final PhysicalDatasourceMetadata dsMetadata, + final DatasourceFacade dsSpec) + { + final RowSignature physicalSchema = dsMetadata.getRowSignature(); + Set physicalCols = new HashSet<>(physicalSchema.getColumnNames()); + + // Merge columns. All catalog-defined columns come first, + // in the order defined in the catalog. + final RowSignature.Builder builder = RowSignature.builder(); + Map columns = new HashMap<>(); + for (ColumnSpec col : dsSpec.columns()) { + ColumnType physicalType = null; + if (physicalCols.remove(col.name())) { + physicalType = dsMetadata.getRowSignature().getColumnType(col.name()).get(); + } + EffectiveColumnMetadata colMetadata = columnFromCatalog(col, physicalType); + builder.add(col.name(), colMetadata.druidType()); + columns.put(col.name(), colMetadata); + } + + // Mark any hidden columns. Assumes that the hidden columns are a disjoint set + // from the defined columns. + if (dsSpec.hiddenColumns() != null) { + for (String colName : dsSpec.hiddenColumns()) { + physicalCols.remove(colName); + } + } + + // Any remaining columns follow, if not marked as hidden + // in the catalog. + for (int i = 0; i < physicalSchema.size(); i++) { + String colName = physicalSchema.getColumnName(i); + if (!physicalCols.contains(colName)) { + continue; + } + ColumnType physicalType = dsMetadata.getRowSignature().getColumnType(colName).get(); + EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, physicalType); + columns.put(colName, colMetadata); + builder.add(colName, physicalType); + } + + EffectiveMetadata effectiveMetadata = new EffectiveMetadata(dsSpec, columns, false); + return new DatasourceTable(builder.build(), dsMetadata, effectiveMetadata); + } + + @Override + public boolean ingestRequiresExistingTable() + { + return false; + } + + @Override + public Set getTableNames(Set datasourceNames) + { + Set catalogTableNames = catalog.tableNames(TableId.DRUID_SCHEMA); + if (catalogTableNames.isEmpty()) { + return datasourceNames; + } + return ImmutableSet.builder() + .addAll(datasourceNames) + .addAll(catalogTableNames) + .build(); + } +} diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java index b30a12ee9274..dfbf2b5731fe 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CachedMetadataCatalog.java @@ -277,6 +277,22 @@ public synchronized Set tableNames() }); return tables; } + + /** + * Populate the cache by asking the catalog source for all tables for + * this schema. + */ + public synchronized void resync(CatalogSource source) + { + List tables = source.tablesForSchema(schema.name()); + cache.clear(); + for (TableMetadata table : tables) { + cache.compute( + table.id().name(), + (k, v) -> computeCreate(v, table) + ); + } + } } private final ConcurrentHashMap schemaCache = new ConcurrentHashMap<>(); @@ -326,6 +342,12 @@ public void updated(UpdateEvent event) } } + /** + * Get the list of table names in the cache. Does not attempt to + * lazy load the list since doing so is costly: we don't know when it + * might be out of date. Rely on priming the cache, and update notifications + * to keep the list accurate. + */ @Override public Set tableNames(String schemaName) { @@ -333,8 +355,13 @@ public Set tableNames(String schemaName) return schemaEntry == null ? Collections.emptySet() : schemaEntry.tableNames(); } + /** + * Clear the cache. Primarily for testing. + */ + @Override public void flush() { + LOG.info("Flush requested"); schemaCache.clear(); } @@ -347,4 +374,18 @@ private SchemaEntry entryFor(String schemaName) return schema == null ? null : new SchemaEntry(schema); }); } + + /** + * Discard any existing cached tables and reload directly from the + * catalog source. Manages the two schemas which the catalog manages. + * If the catalog were to manage others, add those here as well. + * Done both at Broker startup, and on demand for testing. + */ + @Override + public void resync() + { + LOG.info("Resync requested"); + entryFor(TableId.DRUID_SCHEMA).resync(base); + entryFor(TableId.EXTERNAL_SCHEMA).resync(base); + } } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java index a06addfce710..73126fc3041f 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogClient.java @@ -86,7 +86,7 @@ public CatalogClient( @Override public List tablesForSchema(String dbSchema) { - String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{dbSchema}", dbSchema); + String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{schema}", dbSchema); List results = send(url, LIST_OF_TABLE_METADATA_TYPE); // Not found for a list is an empty list. @@ -96,7 +96,7 @@ public List tablesForSchema(String dbSchema) @Override public TableMetadata table(TableId id) { - String url = StringUtils.replace(TABLE_SYNC_PATH, "{dbSchema}", id.schema()); + String url = StringUtils.replace(TABLE_SYNC_PATH, "{schema}", id.schema()); url = StringUtils.replace(url, "{name}", id.name()); return send(url, TABLE_METADATA_TYPE); } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java index afddeb00221a..553ed49c00f3 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateListener.java @@ -28,4 +28,6 @@ public interface CatalogUpdateListener { void updated(UpdateEvent event); + void flush(); + void resync(); } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java index f60507c8e2c9..1c817493a459 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java @@ -87,14 +87,14 @@ public CatalogUpdateNotifier( public void start() { notifier.start(); - LOG.info("Catalog catalog update notifier started"); + LOG.info("Catalog update notifier started"); } @LifecycleStop public void stop() { notifier.stop(); - LOG.info("Catalog catalog update notifier stopped"); + LOG.info("Catalog update notifier stopped"); } @Override @@ -102,4 +102,16 @@ public void updated(UpdateEvent event) { notifier.send(JacksonUtils.toBytes(smileMapper, event)); } + + @Override + public void flush() + { + // Not generated on this path + } + + @Override + public void resync() + { + // Not generated on this path + } } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java new file mode 100644 index 000000000000..1dfeaa1307db --- /dev/null +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateReceiver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sync; + +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import javax.inject.Inject; + +/** + * Receiver which runs in the Broker to listen for catalog updates from the + * Coordinator. To prevent slowing initial queries, this class loads the + * current catalog contents into the local cache on lifecycle start, which + * avoids the on-demand reads that would otherwise occur. After the first load, + * events from the Coordinator keep the local cache evergreen. + */ +@ManageLifecycle +public class CatalogUpdateReceiver +{ + private static final EmittingLogger LOG = new EmittingLogger(CatalogUpdateReceiver.class); + + private final CachedMetadataCatalog cachedCatalog; + + @Inject + public CatalogUpdateReceiver( + final CachedMetadataCatalog cachedCatalog + ) + { + this.cachedCatalog = cachedCatalog; + } + + @LifecycleStart + public void start() + { + cachedCatalog.resync(); + LOG.info("Catalog update receiver started"); + } +} diff --git a/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index 3f9cd5ca725e..bcf0117c8a78 100644 --- a/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-core/druid-catalog/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -14,3 +14,4 @@ # limitations under the License. org.apache.druid.catalog.guice.CatalogCoordinatorModule +org.apache.druid.catalog.guice.CatalogBrokerModule diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java new file mode 100644 index 000000000000..2ee9041fc929 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogQueryTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.SqlSchema; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.fail; + +public class CatalogQueryTest extends BaseCalciteQueryTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private CatalogTests.DbFixture dbFixture; + private CatalogStorage storage; + + @Test + public void testCatalogSchema() + { + SqlSchema schema = SqlSchema.builder() + .column("__time", "TIMESTAMP(3) NOT NULL") + .column("extra1", "VARCHAR") + .column("dim2", "VARCHAR") + .column("dim1", "VARCHAR") + .column("cnt", "BIGINT NOT NULL") + .column("m1", "DOUBLE NOT NULL") + .column("extra2", "BIGINT NOT NULL") + .column("extra3", "VARCHAR") + .column("m2", "DOUBLE NOT NULL") + .build(); + testBuilder() + .sql("SELECT * FROM foo ORDER BY __time LIMIT 1") + .expectedResources(Collections.singletonList(dataSourceRead("foo"))) + //.expectedSqlSchema(schema) + .run(); + } + + @After + public void catalogTearDown() + { + CatalogTests.tearDown(dbFixture); + } + + @Override + public CatalogResolver createCatalogResolver() + { + dbFixture = new CatalogTests.DbFixture(derbyConnectorRule); + storage = dbFixture.storage; + MetadataCatalog catalog = new CachedMetadataCatalog( + storage, + storage.schemaRegistry(), + storage.jsonMapper() + ); + return new LiveCatalogResolver(catalog); + } + + @Override + public void finalizeTestFramework(SqlTestFramework sqlTestFramework) + { + super.finalizeTestFramework(sqlTestFramework); + buildFooDatasource(); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (CatalogException e) { + fail(e.getMessage()); + } + } + + public void buildFooDatasource() + { + TableMetadata spec = TableBuilder.datasource("foo", "ALL") + .timeColumn() + .column("extra1", null) + .column("dim2", null) + .column("dim1", null) + .column("cnt", null) + .column("m1", Columns.DOUBLE) + .column("extra2", Columns.LONG) + .column("extra3", Columns.STRING) + .hiddenColumns(Arrays.asList("dim3", "unique_dim1")) + .build(); + createTableMetadata(spec); + } +} diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java new file mode 100644 index 000000000000..54b0191b97ea --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/LiveCatalogTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException.DuplicateKeyException; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.LocalMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata; +import org.apache.druid.sql.calcite.table.DruidTable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +/** + * Test for the datasource resolution aspects of the live catalog resolver. + * Too tedious to test the insert resolution in its current state. + */ +public class LiveCatalogTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private CatalogTests.DbFixture dbFixture; + private CatalogStorage storage; + private CatalogResolver resolver; + + @Before + public void setUp() + { + dbFixture = new CatalogTests.DbFixture(derbyConnectorRule); + storage = dbFixture.storage; + MetadataCatalog catalog = new LocalMetadataCatalog(storage, storage.schemaRegistry()); + resolver = new LiveCatalogResolver(catalog); + } + + @After + public void tearDown() + { + CatalogTests.tearDown(dbFixture); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (DuplicateKeyException e) { + fail(e.getMessage()); + } + } + + /** + * Populate the catalog with a few items using the REST resource. + */ + private void populateCatalog(boolean withTimeCol) + { + TableMetadata table = TableBuilder.datasource("trivial", "PT1D") + .build(); + createTableMetadata(table); + + TableBuilder builder = TableBuilder.datasource("merge", "PT1D"); + if (withTimeCol) { + builder.timeColumn(); + } + table = builder + .column("dsa", null) + .column("dsb", Columns.STRING) + .column("dsc", Columns.LONG) + .column("dsd", Columns.FLOAT) + .column("dse", Columns.DOUBLE) + .column("newa", null) + .column("newb", Columns.STRING) + .column("newc", Columns.LONG) + .column("newd", Columns.FLOAT) + .column("newe", Columns.DOUBLE) + .hiddenColumns(Arrays.asList("dsf", "dsg")) + .build(); + createTableMetadata(table); + } + + private PhysicalDatasourceMetadata mockDatasource() + { + RowSignature sig = RowSignature.builder() + .add(Columns.TIME_COLUMN, ColumnType.LONG) + .add("dsa", ColumnType.DOUBLE) + .add("dsb", ColumnType.LONG) + .add("dsc", ColumnType.STRING) + .add("dsd", ColumnType.LONG) + .add("dse", ColumnType.FLOAT) + .add("dsf", ColumnType.STRING) + .add("dsg", ColumnType.LONG) + .add("dsh", ColumnType.DOUBLE) + .build(); + return new PhysicalDatasourceMetadata( + new TableDataSource("merge"), + sig, + true, + true + ); + } + + @Test + public void testUnknownTable() + { + // No catalog, no datasource + assertNull(resolver.resolveDatasource("bogus", null)); + + // No catalog entry + PhysicalDatasourceMetadata dsMetadata = mockDatasource(); + DruidTable table = resolver.resolveDatasource("merge", dsMetadata); + assertSame(dsMetadata.getRowSignature(), table.getRowSignature()); + } + + @Test + public void testKnownTableNoTime() + { + populateCatalog(false); + + // Catalog, no datasource + DruidTable table = resolver.resolveDatasource("merge", null); + assertEquals(11, table.getRowSignature().size()); + assertEquals("merge", ((TableDataSource) table.getDataSource()).getName()); + + // Spot check + assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG); + assertColumnEquals(table, 1, "dsa", ColumnType.STRING); + assertColumnEquals(table, 2, "dsb", ColumnType.STRING); + assertColumnEquals(table, 3, "dsc", ColumnType.LONG); + + // Catalog, with datasource, result is merged + // Catalog has no time column + PhysicalDatasourceMetadata dsMetadata = mockDatasource(); + table = resolver.resolveDatasource("merge", dsMetadata); + assertEquals(12, table.getRowSignature().size()); + assertSame(dsMetadata.dataSource(), table.getDataSource()); + assertEquals(dsMetadata.isBroadcast(), table.isBroadcast()); + assertEquals(dsMetadata.isJoinable(), table.isJoinable()); + + // dsa uses Druid's type, others coerce the type + assertColumnEquals(table, 0, "dsa", ColumnType.DOUBLE); + assertColumnEquals(table, 1, "dsb", ColumnType.STRING); + assertColumnEquals(table, 2, "dsc", ColumnType.LONG); + assertColumnEquals(table, 3, "dsd", ColumnType.FLOAT); + assertColumnEquals(table, 4, "dse", ColumnType.DOUBLE); + assertColumnEquals(table, 5, "newa", ColumnType.STRING); + assertColumnEquals(table, 9, "newe", ColumnType.DOUBLE); + assertColumnEquals(table, 10, Columns.TIME_COLUMN, ColumnType.LONG); + assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE); + } + + @Test + public void testKnownTableWithTime() + { + populateCatalog(true); + + // Catalog, no datasource + DruidTable table = resolver.resolveDatasource("merge", null); + assertEquals(11, table.getRowSignature().size()); + assertEquals("merge", ((TableDataSource) table.getDataSource()).getName()); + + // Spot check + assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG); + assertColumnEquals(table, 1, "dsa", ColumnType.STRING); + assertColumnEquals(table, 2, "dsb", ColumnType.STRING); + assertColumnEquals(table, 3, "dsc", ColumnType.LONG); + + // Catalog, with datasource, result is merged + PhysicalDatasourceMetadata dsMetadata = mockDatasource(); + table = resolver.resolveDatasource("merge", dsMetadata); + assertEquals(12, table.getRowSignature().size()); + assertSame(dsMetadata.dataSource(), table.getDataSource()); + assertEquals(dsMetadata.isBroadcast(), table.isBroadcast()); + assertEquals(dsMetadata.isJoinable(), table.isJoinable()); + + assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG); + // dsa uses Druid's type, others coerce the type + assertColumnEquals(table, 1, "dsa", ColumnType.DOUBLE); + assertColumnEquals(table, 2, "dsb", ColumnType.STRING); + assertColumnEquals(table, 3, "dsc", ColumnType.LONG); + assertColumnEquals(table, 4, "dsd", ColumnType.FLOAT); + assertColumnEquals(table, 5, "dse", ColumnType.DOUBLE); + assertColumnEquals(table, 6, "newa", ColumnType.STRING); + assertColumnEquals(table, 10, "newe", ColumnType.DOUBLE); + assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE); + } + + private void assertColumnEquals(DruidTable table, int i, String name, ColumnType type) + { + RowSignature sig = table.getRowSignature(); + assertEquals(name, sig.getColumnName(i)); + assertEquals(type, sig.getColumnType(i).get()); + } +} diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java index 1b98ce272706..c138a71dd8cc 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/storage/TableManagerTest.java @@ -207,8 +207,8 @@ public void testUpdateColumns() throws CatalogException DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000 ); List cols = Arrays.asList( - new ColumnSpec("a", Columns.VARCHAR, null), - new ColumnSpec("b", Columns.BIGINT, null) + new ColumnSpec("a", Columns.STRING, null), + new ColumnSpec("b", Columns.LONG, null) ); ColumnSpec colC = new ColumnSpec("c", Columns.DOUBLE, null); diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java new file mode 100644 index 000000000000..9027991399b4 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogCacheTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sync; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.catalog.CatalogException.DuplicateKeyException; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.metadata.TestDerbyConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Builds on the generic {@link CatalogSyncTest} to focus on cache-specific + * operations. + */ +public class CatalogCacheTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private CatalogTests.DbFixture dbFixture; + private CatalogStorage storage; + private ObjectMapper jsonMapper; + + @Before + public void setUp() + { + dbFixture = new CatalogTests.DbFixture(derbyConnectorRule); + storage = dbFixture.storage; + jsonMapper = new ObjectMapper(); + } + + @After + public void tearDown() + { + CatalogTests.tearDown(dbFixture); + } + + /** + * Test overall cache lifecycle. Detailed checks of contents is done + * in {@link CatalogSyncTest} and is not repeated here. + */ + @Test + public void testLifecycle() throws DuplicateKeyException + { + // Create entries with no listener. + TableMetadata table1 = TableBuilder.datasource("table1", "P1D") + .timeColumn() + .column("a", Columns.STRING) + .build(); + storage.validate(table1); + storage.tables().create(table1); + + // Create a listener. Starts with the cache empty + CachedMetadataCatalog cache1 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper); + storage.register(cache1); + assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty()); + + // Load table on demand. + assertNotNull(cache1.getTable(table1.id())); + assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size()); + + // Flush to empty the cache. + cache1.flush(); + assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty()); + + // Resync to reload the cache. + cache1.resync(); + assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache1.getTable(table1.id())); + + // Add a table: cache is updated. + TableMetadata table2 = TableBuilder.datasource("table2", "P1D") + .timeColumn() + .column("dim", Columns.STRING) + .column("measure", Columns.LONG) + .build(); + storage.validate(table2); + storage.tables().create(table2); + assertEquals(2, cache1.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache1.getTable(table2.id())); + + // Second listener. Starts with the cache empty. + CachedMetadataCatalog cache2 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper); + storage.register(cache2); + assertTrue(cache2.tableNames(TableId.DRUID_SCHEMA).isEmpty()); + + // Second listener resyncs. + cache2.resync(); + assertEquals(2, cache2.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache2.getTable(table1.id())); + assertNotNull(cache2.getTable(table2.id())); + + // Add a third table: both caches updated. + TableMetadata table3 = TableBuilder.datasource("table3", "PT1H") + .timeColumn() + .column("x", Columns.STRING) + .column("y", Columns.LONG) + .build(); + storage.validate(table3); + storage.tables().create(table3); + assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache1.getTable(table3.id())); + assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache2.getTable(table3.id())); + + // Another resync puts us back where we are. + cache1.flush(); + assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty()); + cache1.resync(); + assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache1.getTable(table3.id())); + + cache2.resync(); + assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache2.getTable(table3.id())); + + // Third cache, managed by the receiver. + CachedMetadataCatalog cache3 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper); + storage.register(cache3); + CatalogUpdateReceiver receiver = new CatalogUpdateReceiver(cache3); + receiver.start(); + assertEquals(3, cache3.tableNames(TableId.DRUID_SCHEMA).size()); + assertNotNull(cache3.getTable(table3.id())); + } +} diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java index fc77735e499e..c159c51c5eec 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/CatalogSyncTest.java @@ -96,7 +96,7 @@ public void testInputValidation() TableMetadata table = TableBuilder.external("externTable") .inputSource(toMap(new InlineInputSource("a\nc"))) .inputFormat(BaseExternTableTest.CSV_FORMAT) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); storage.validate(table); } @@ -114,7 +114,7 @@ public void testInputValidation() { TableMetadata table = TableBuilder.external("externTable") .inputSource(toMap(new InlineInputSource("a\nc"))) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); assertThrows(IAE.class, () -> storage.validate(table)); } @@ -195,15 +195,15 @@ private void populateCatalog() throws DuplicateKeyException { TableMetadata table1 = TableBuilder.datasource("table1", "P1D") .timeColumn() - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); storage.validate(table1); storage.tables().create(table1); TableMetadata table2 = TableBuilder.datasource("table2", "P1D") .timeColumn() - .column("dim", Columns.VARCHAR) - .column("measure", "BIGINT") + .column("dim", Columns.STRING) + .column("measure", Columns.LONG) .build(); storage.validate(table2); storage.tables().create(table2); @@ -211,7 +211,7 @@ private void populateCatalog() throws DuplicateKeyException TableMetadata table3 = TableBuilder.external("table3") .inputFormat(BaseExternTableTest.CSV_FORMAT) .inputSource(toMap(new InlineInputSource("a\nc"))) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); storage.validate(table3); storage.tables().create(table3); @@ -230,9 +230,9 @@ private void verifyInitial(MetadataCatalog catalog) List cols = dsSpec.columns(); assertEquals(2, cols.size()); assertEquals(Columns.TIME_COLUMN, cols.get(0).name()); - assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType()); + assertEquals(Columns.LONG, cols.get(0).dataType()); assertEquals("a", cols.get(1).name()); - assertEquals(Columns.VARCHAR, cols.get(1).sqlType()); + assertEquals(Columns.STRING, cols.get(1).dataType()); DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id)); assertEquals("P1D", ds.segmentGranularityString()); @@ -249,11 +249,11 @@ private void verifyInitial(MetadataCatalog catalog) assertEquals(3, cols.size()); assertEquals("__time", cols.get(0).name()); assertEquals(Columns.TIME_COLUMN, cols.get(0).name()); - assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType()); + assertEquals(Columns.LONG, cols.get(0).dataType()); assertEquals("dim", cols.get(1).name()); - assertEquals(Columns.VARCHAR, cols.get(1).sqlType()); + assertEquals(Columns.STRING, cols.get(1).dataType()); assertEquals("measure", cols.get(2).name()); - assertEquals("BIGINT", cols.get(2).sqlType()); + assertEquals(Columns.LONG, cols.get(2).dataType()); DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id)); assertEquals("P1D", ds.segmentGranularityString()); @@ -273,7 +273,7 @@ private void verifyInitial(MetadataCatalog catalog) List cols = inputSpec.columns(); assertEquals(1, cols.size()); assertEquals("a", cols.get(0).name()); - assertEquals(Columns.VARCHAR, cols.get(0).sqlType()); + assertEquals(Columns.STRING, cols.get(0).dataType()); assertNotNull(inputSpec.properties()); } @@ -303,7 +303,7 @@ private void alterCatalog() throws DuplicateKeyException, NotFoundException // Create a table 3 TableMetadata table3 = TableBuilder.datasource("table3", "P1D") .timeColumn() - .column("x", "FLOAT") + .column("x", Columns.FLOAT) .build(); storage.tables().create(table3); } @@ -320,7 +320,7 @@ private void verifyAltered(MetadataCatalog catalog) assertEquals(Columns.TIME_COLUMN, cols.get(0).name()); assertEquals("a", cols.get(1).name()); assertEquals("b", cols.get(2).name()); - assertEquals(Columns.DOUBLE, cols.get(2).sqlType()); + assertEquals(Columns.DOUBLE, cols.get(2).dataType()); } { TableId id = TableId.datasource("table3"); diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java index 7a5b7f03b273..16f2dcaa50e6 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sync/MockCatalogSync.java @@ -55,4 +55,14 @@ public MetadataCatalog catalog() { return catalog; } + + @Override + public void flush() + { + } + + @Override + public void resync() + { + } } diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java index eb7a49f78a82..066d5a44cad1 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/CatalogResourceTest.java @@ -150,9 +150,9 @@ public void testCreate() TableSpec inputSpec = TableBuilder.external("inline") .inputSource(toMap(new InlineInputSource("a,b,1\nc,d,2\n"))) .inputFormat(BaseExternTableTest.CSV_FORMAT) - .column("a", Columns.VARCHAR) - .column("b", Columns.VARCHAR) - .column("c", Columns.BIGINT) + .column("a", Columns.STRING) + .column("b", Columns.STRING) + .column("c", Columns.LONG) .buildSpec(); resp = resource.postTable(TableId.EXTERNAL_SCHEMA, "inline", inputSpec, 0, false, postBy(CatalogTests.WRITER_USER)); assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java index bfb8fa02b701..5426e365f1f8 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java @@ -401,7 +401,7 @@ public void testUpdateColumns() throws CatalogException // Add a column cmd = new UpdateColumns( Collections.singletonList( - new ColumnSpec("d", Columns.VARCHAR, null) + new ColumnSpec("d", Columns.STRING, null) ) ); TableMetadata revised = doEdit(tableName, cmd); @@ -411,14 +411,14 @@ public void testUpdateColumns() throws CatalogException ); ColumnSpec colD = revised.spec().columns().get(3); assertEquals("d", colD.name()); - assertEquals(Columns.VARCHAR, colD.sqlType()); + assertEquals(Columns.STRING, colD.dataType()); // Update a column cmd = new UpdateColumns( Collections.singletonList( new ColumnSpec( "a", - Columns.BIGINT, + Columns.LONG, ImmutableMap.of("foo", "bar") ) ) @@ -430,13 +430,13 @@ public void testUpdateColumns() throws CatalogException ); ColumnSpec colA = revised.spec().columns().get(0); assertEquals("a", colA.name()); - assertEquals(Columns.BIGINT, colA.sqlType()); + assertEquals(Columns.LONG, colA.dataType()); assertEquals(ImmutableMap.of("foo", "bar"), colA.properties()); // Duplicates UpdateColumns cmd2 = new UpdateColumns( Arrays.asList( - new ColumnSpec("e", Columns.VARCHAR, null), + new ColumnSpec("e", Columns.STRING, null), new ColumnSpec("e", null, null) ) ); @@ -445,7 +445,7 @@ public void testUpdateColumns() throws CatalogException // Valid time column type cmd = new UpdateColumns( Collections.singletonList( - new ColumnSpec(Columns.TIME_COLUMN, Columns.TIMESTAMP, null) + new ColumnSpec(Columns.TIME_COLUMN, Columns.LONG, null) ) ); revised = doEdit(tableName, cmd); diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt new file mode 100644 index 000000000000..fffc3d7d9874 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertFromTable-logicalPlan.txt @@ -0,0 +1,3 @@ +LogicalInsert(target=[dst], partitionedBy=['ALL TIME'], clusteredBy=[]) + LogicalProject(__time=[$0], extra1=[$1], dim2=[$2], dim1=[$3], cnt=[$4], m1=[$5], extra2=[$6], extra3=[$7], m2=[$8]) + LogicalTableScan(table=[[druid, foo]]) diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt new file mode 100644 index 000000000000..71df2ac7e3f9 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy-logicalPlan.txt @@ -0,0 +1,4 @@ +LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[]) + LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC]) + LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)]) + LogicalTableScan(table=[[druid, foo]]) diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt new file mode 100644 index 000000000000..24e70f5c8bb7 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithCatalogClusteredBy2-logicalPlan.txt @@ -0,0 +1,4 @@ +LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[`floor_m1`, `dim1` DESC]) + LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC]) + LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)]) + LogicalTableScan(table=[[druid, foo]]) diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt new file mode 100644 index 000000000000..71df2ac7e3f9 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt @@ -0,0 +1,4 @@ +LogicalInsert(target=[druid.clusterBy], partitionedBy=[], clusteredBy=[]) + LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC]) + LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)]) + LogicalTableScan(table=[[druid, foo]]) diff --git a/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt new file mode 100644 index 000000000000..8aba08a54968 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/resources/calcite/expected/ingest/insertWithClusteredBy2-logicalPlan.txt @@ -0,0 +1,4 @@ +LogicalInsert(target=[dst], partitionedBy=[FLOOR(`__time` TO DAY)], clusteredBy=[`floor_m1`, `dim1` DESC]) + LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC]) + LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)]) + LogicalTableScan(table=[[druid, foo]]) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 36301a5fe0f9..3f5367b27c50 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -520,7 +520,8 @@ public String getFormatString() new PlannerConfig(), viewManager, new NoopDruidSchemaManager(), - CalciteTests.TEST_AUTHORIZER_MAPPER + CalciteTests.TEST_AUTHORIZER_MAPPER, + CatalogResolver.NULL_RESOLVER ); final SqlEngine engine = new MSQTaskSqlEngine( diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java index dcbbbcfb7acf..c35bc1824bc9 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java @@ -68,8 +68,8 @@ public class S3InputSourceDefnTest { private static final List COLUMNS = Arrays.asList( - new ColumnSpec("x", Columns.VARCHAR, null), - new ColumnSpec("y", Columns.BIGINT, null) + new ColumnSpec("x", Columns.STRING, null), + new ColumnSpec("y", Columns.LONG, null) ); /** @@ -110,7 +110,7 @@ public void testValidateEmptyInputSource() TableMetadata table = TableBuilder.external("foo") .inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -176,7 +176,7 @@ public void testValidateNoFormatWithColumns() Collections.singletonList("s3://foo/bar/file.csv"), null, null, null); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(s3InputSource)) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -215,7 +215,7 @@ public void testValidateGood() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -228,7 +228,7 @@ public void testBucketOnly() .inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME)) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "s3://foo.com") - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -243,7 +243,7 @@ public void testBucketAndUri() .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com") - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -262,7 +262,7 @@ public void testBucketAndPrefix() .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com") - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -281,7 +281,7 @@ public void testBucketAndObject() .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com") - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -298,7 +298,7 @@ public void testBucketAndGlob() ) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com") - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -564,8 +564,8 @@ public void testFullTableSpecHappyPath() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -602,8 +602,8 @@ public void testTableSpecWithoutConfig() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(s3InputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -646,8 +646,8 @@ public void testTableSpecWithBucketAndFormat() .inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME)) .inputFormat(CSV_FORMAT) .property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com") - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation diff --git a/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java b/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java index e66a584cae60..841d592062bf 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java +++ b/server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java @@ -98,7 +98,7 @@ public static T safeCast(Object value, Class type, String key) return type.cast(value); } catch (ClassCastException e) { - throw new IAE("Value [%s] is not valid for property %s, expected type %s", + throw new IAE("Value [%s] is not valid for property [%s], expected type [%s]", value, key, type.getSimpleName() diff --git a/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java b/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java index 38ac4fdbe804..cfc9fc55cf43 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java +++ b/server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java @@ -35,8 +35,7 @@ import java.util.Objects; /** - * Specification of table columns. Columns have multiple types - * represented via the type field. + * Specification of table columns. */ @UnstableApi public class ColumnSpec @@ -49,14 +48,10 @@ public class ColumnSpec private final String name; /** - * The data type of the column expressed as a supported SQL type. The data type here must - * directly match a Druid storage type. So, {@code BIGINT} for {code long}, say. - * This usage does not support Druid's usual "fudging": one cannot use {@code INTEGER} - * to mean {@code long}. The type will likely encode complex and aggregation types - * in the future, though that is not yet supported. The set of valid mappings is - * defined in the {@link Columns} class. + * The data type of the column expressed as a supported Druid type. The data type here must + * directly match a Druid storage type. */ - private final String sqlType; + private final String dataType; /** * Properties for the column. At present, these are all user and application defined. @@ -69,18 +64,18 @@ public class ColumnSpec @JsonCreator public ColumnSpec( @JsonProperty("name")final String name, - @JsonProperty("sqlType") @Nullable final String sqlType, + @JsonProperty("dataType") @Nullable final String dataType, @JsonProperty("properties") @Nullable final Map properties ) { this.name = name; - this.sqlType = sqlType; + this.dataType = dataType; this.properties = properties == null ? Collections.emptyMap() : properties; } public ColumnSpec(ColumnSpec from) { - this(from.name, from.sqlType, from.properties); + this(from.name, from.dataType, from.properties); } @JsonProperty("name") @@ -89,11 +84,11 @@ public String name() return name; } - @JsonProperty("sqlType") + @JsonProperty("dataType") @JsonInclude(Include.NON_NULL) - public String sqlType() + public String dataType() { - return sqlType; + return dataType; } @JsonProperty("properties") @@ -108,6 +103,16 @@ public void validate() if (Strings.isNullOrEmpty(name)) { throw new IAE("Column name is required"); } + if (Columns.isTimeColumn(name)) { + if (dataType != null && !Columns.LONG.equalsIgnoreCase(dataType)) { + throw new IAE( + "[%s] column must have type [%s] or no type. Found [%s]", + name, + Columns.LONG, + dataType + ); + } + } // Validate type in the next PR } @@ -126,7 +131,7 @@ public ColumnSpec merge( final ColumnSpec update ) { - String revisedType = update.sqlType() == null ? sqlType() : update.sqlType(); + String revisedType = update.dataType() == null ? dataType() : update.dataType(); Map revisedProps = CatalogUtils.mergeProperties( columnProperties, properties(), @@ -152,7 +157,7 @@ public boolean equals(Object o) } ColumnSpec other = (ColumnSpec) o; return Objects.equals(this.name, other.name) - && Objects.equals(this.sqlType, other.sqlType) + && Objects.equals(this.dataType, other.dataType) && Objects.equals(this.properties, other.properties); } @@ -161,7 +166,7 @@ public int hashCode() { return Objects.hash( name, - sqlType, + dataType, properties ); } diff --git a/server/src/main/java/org/apache/druid/catalog/model/Columns.java b/server/src/main/java/org/apache/druid/catalog/model/Columns.java index e19f0052ed5e..06b534142a19 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/Columns.java +++ b/server/src/main/java/org/apache/druid/catalog/model/Columns.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import java.util.List; import java.util.Map; @@ -34,70 +35,87 @@ public class Columns { public static final String TIME_COLUMN = "__time"; - public static final String VARCHAR = "VARCHAR"; - public static final String BIGINT = "BIGINT"; - public static final String FLOAT = "FLOAT"; - public static final String DOUBLE = "DOUBLE"; - public static final String VARCHAR_ARRAY = "VARCHAR ARRAY"; - public static final String BIGINT_ARRAY = "BIGINT ARRAY"; - public static final String FLOAT_ARRAY = "FLOAT ARRAY"; - public static final String DOUBLE_ARRAY = "DOUBLE ARRAY"; - public static final String TIMESTAMP = "TIMESTAMP"; + public static final String STRING = ValueType.STRING.name(); + public static final String LONG = ValueType.LONG.name(); + public static final String FLOAT = ValueType.FLOAT.name(); + public static final String DOUBLE = ValueType.DOUBLE.name(); + + public static final String SQL_VARCHAR = "VARCHAR"; + public static final String SQL_BIGINT = "BIGINT"; + public static final String SQL_FLOAT = "FLOAT"; + public static final String SQL_DOUBLE = "DOUBLE"; + public static final String SQL_VARCHAR_ARRAY = "VARCHAR ARRAY"; + public static final String SQL_BIGINT_ARRAY = "BIGINT ARRAY"; + public static final String SQL_FLOAT_ARRAY = "FLOAT ARRAY"; + public static final String SQL_DOUBLE_ARRAY = "DOUBLE ARRAY"; + public static final String SQL_TIMESTAMP = "TIMESTAMP"; public static final Set NUMERIC_TYPES = - ImmutableSet.of(BIGINT, FLOAT, DOUBLE); + ImmutableSet.of(LONG, FLOAT, DOUBLE); public static final Set SCALAR_TYPES = - ImmutableSet.of(TIMESTAMP, VARCHAR, BIGINT, FLOAT, DOUBLE); + ImmutableSet.of(STRING, LONG, FLOAT, DOUBLE); public static final Map SQL_TO_DRUID_TYPES = new ImmutableMap.Builder() - .put(TIMESTAMP, ColumnType.LONG) - .put(BIGINT, ColumnType.LONG) - .put(FLOAT, ColumnType.FLOAT) - .put(DOUBLE, ColumnType.DOUBLE) - .put(VARCHAR, ColumnType.STRING) - .put(VARCHAR_ARRAY, ColumnType.STRING_ARRAY) - .put(BIGINT_ARRAY, ColumnType.LONG_ARRAY) - .put(FLOAT_ARRAY, ColumnType.FLOAT_ARRAY) - .put(DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY) + .put(SQL_TIMESTAMP, ColumnType.LONG) + .put(SQL_BIGINT, ColumnType.LONG) + .put(SQL_FLOAT, ColumnType.FLOAT) + .put(SQL_DOUBLE, ColumnType.DOUBLE) + .put(SQL_VARCHAR, ColumnType.STRING) + .put(SQL_VARCHAR_ARRAY, ColumnType.STRING_ARRAY) + .put(SQL_BIGINT_ARRAY, ColumnType.LONG_ARRAY) + .put(SQL_FLOAT_ARRAY, ColumnType.FLOAT_ARRAY) + .put(SQL_DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY) .build(); public static final Map DRUID_TO_SQL_TYPES = new ImmutableMap.Builder() - .put(ColumnType.LONG, BIGINT) + .put(ColumnType.LONG, SQL_BIGINT) .put(ColumnType.FLOAT, FLOAT) .put(ColumnType.DOUBLE, DOUBLE) - .put(ColumnType.STRING, VARCHAR) - .put(ColumnType.STRING_ARRAY, VARCHAR_ARRAY) - .put(ColumnType.LONG_ARRAY, BIGINT_ARRAY) - .put(ColumnType.FLOAT_ARRAY, FLOAT_ARRAY) - .put(ColumnType.DOUBLE_ARRAY, DOUBLE_ARRAY) + .put(ColumnType.STRING, SQL_VARCHAR) + .put(ColumnType.STRING_ARRAY, SQL_VARCHAR_ARRAY) + .put(ColumnType.LONG_ARRAY, SQL_BIGINT_ARRAY) + .put(ColumnType.FLOAT_ARRAY, SQL_FLOAT_ARRAY) + .put(ColumnType.DOUBLE_ARRAY, SQL_DOUBLE_ARRAY) .build(); private Columns() { } - public static boolean isTimestamp(String type) - { - return TIMESTAMP.equalsIgnoreCase(type.trim()); - } - public static boolean isScalar(String type) { return SCALAR_TYPES.contains(StringUtils.toUpperCase(type.trim())); } - public static ColumnType druidType(String sqlType) + public static ColumnType druidType(ColumnSpec spec) { - if (sqlType == null) { + if (isTimeColumn(spec.name())) { + return ColumnType.LONG; + } + String dataType = spec.dataType(); + if (dataType == null) { return null; } - ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType)); + ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(dataType)); if (druidType != null) { return druidType; } - return ColumnType.fromString(sqlType); + return ColumnType.fromString(dataType); + } + + public static String sqlType(ColumnSpec spec) + { + if (isTimeColumn(spec.name())) { + return SQL_TIMESTAMP; + } + ColumnType druidType = druidType(spec); + if (druidType == null) { + return null; + } + String sqlType = DRUID_TO_SQL_TYPES.get(druidType); + return sqlType == null ? druidType.asTypeString() : sqlType; } public static void validateScalarColumn(String name, String type) @@ -106,7 +124,7 @@ public static void validateScalarColumn(String name, String type) return; } if (!Columns.isScalar(type)) { - throw new IAE("Not a supported SQL type: " + type); + throw new IAE("Not a supported Druid type: " + type); } } @@ -119,10 +137,7 @@ public static RowSignature convertSignature(List columns) { RowSignature.Builder builder = RowSignature.builder(); for (ColumnSpec col : columns) { - ColumnType druidType = null; - if (col.sqlType() != null) { - druidType = Columns.druidType(col.sqlType()); - } + ColumnType druidType = druidType(col); if (druidType == null) { druidType = ColumnType.STRING; } diff --git a/server/src/main/java/org/apache/druid/catalog/model/TableId.java b/server/src/main/java/org/apache/druid/catalog/model/TableId.java index 7e278e34397e..55fcc797561b 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/TableId.java +++ b/server/src/main/java/org/apache/druid/catalog/model/TableId.java @@ -85,6 +85,11 @@ public String sqlName() return StringUtils.format("\"%s\".\"%s\"", schema, name); } + public String unquoted() + { + return StringUtils.format("%s.%s", schema, name); + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java index 6a15b57716b1..7ac00d9b608a 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java +++ b/server/src/main/java/org/apache/druid/catalog/model/facade/DatasourceFacade.java @@ -19,15 +19,21 @@ package org.apache.druid.catalog.model.facade; +import com.google.common.collect.ImmutableMap; import org.apache.druid.catalog.model.CatalogUtils; +import org.apache.druid.catalog.model.ColumnSpec; +import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.ResolvedTable; import org.apache.druid.catalog.model.table.ClusterKeySpec; import org.apache.druid.catalog.model.table.DatasourceDefn; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.column.ColumnType; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Convenience wrapper on top of a resolved table (a table spec and its corresponding @@ -38,10 +44,77 @@ public class DatasourceFacade extends TableFacade { private static final Logger LOG = new Logger(DatasourceFacade.class); + public static class ColumnFacade + { + public enum Kind + { + ANY, + TIME, + DIMENSION, + MEASURE + } + + private final ColumnSpec spec; + private final String sqlType; + + public ColumnFacade(ColumnSpec spec) + { + this.spec = spec; + if (Columns.isTimeColumn(spec.name()) && spec.dataType() == null) { + // For __time only, force a type if type is null. + this.sqlType = Columns.LONG; + } else { + this.sqlType = Columns.sqlType(spec); + } + } + + public ColumnSpec spec() + { + return spec; + } + + public boolean hasType() + { + return sqlType != null; + } + + public boolean isTime() + { + return Columns.isTimeColumn(spec.name()); + } + + public ColumnType druidType() + { + return Columns.druidType(spec); + } + + public String sqlStorageType() + { + return sqlType; + } + + @Override + public String toString() + { + return "{spec=" + spec + ", sqlTtype=" + sqlType + "}"; + } + } + + private final List columns; + private final Map columnIndex; public DatasourceFacade(ResolvedTable resolved) { super(resolved); + this.columns = resolved.spec().columns() + .stream() + .map(col -> new ColumnFacade(col)) + .collect(Collectors.toList()); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (ColumnFacade col : columns) { + builder.put(col.spec.name(), col); + } + columnIndex = builder.build(); } public String segmentGranularityString() @@ -89,4 +162,14 @@ public boolean isSealed() { return booleanProperty(DatasourceDefn.SEALED_PROPERTY); } + + public List columnFacades() + { + return columns; + } + + public ColumnFacade column(String name) + { + return columnIndex.get(name); + } } diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java index f2571150305e..11c3d60def98 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java +++ b/server/src/main/java/org/apache/druid/catalog/model/facade/ExternalTableFacade.java @@ -22,7 +22,6 @@ import org.apache.druid.catalog.model.ColumnSpec; import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.ResolvedTable; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -40,7 +39,7 @@ public RowSignature rowSignature() List columns = spec().columns(); RowSignature.Builder builder = RowSignature.builder(); for (ColumnSpec col : columns) { - ColumnType druidType = Columns.SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(col.sqlType())); + ColumnType druidType = Columns.druidType(col); if (druidType == null) { druidType = ColumnType.STRING; } diff --git a/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java b/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java index b159ec8ebe7a..c41e24837545 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java +++ b/server/src/main/java/org/apache/druid/catalog/model/facade/TableFacade.java @@ -61,11 +61,7 @@ public List columns() public static ColumnType druidType(ColumnSpec col) { - if (Columns.isTimeColumn(col.name())) { - return ColumnType.LONG; - } - final String sqlType = col.sqlType(); - return sqlType == null ? null : Columns.druidType(sqlType); + return Columns.druidType(col); } public ObjectMapper jsonMapper() diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java index 660e73c35239..b5e47e99b113 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java @@ -275,7 +275,7 @@ protected List selectPartialTableColumns( return columns; } else if (!CollectionUtils.isNullOrEmpty(columns)) { throw new IAE( - "Catalog definition for the %s input source already contains column definitions", + "Catalog definition for the [%s] input source already contains column definitions", typeValue() ); } else { diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java index 97b080995aca..aa50dd3498e4 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java @@ -68,16 +68,6 @@ public class DatasourceDefn extends TableDefn */ public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns"; - /** - * By default: columns are optional hints. If a datasource has columns defined, - * well validate them, but MSQ and other tools are free to create additional columns. - * That is, we assume "auto-discovered" columns by default. However, in some use cases, - * the schema may be carefully designed. This is especially true for ETL use cases in - * which multiple input schemas are mapped into a single datasource schema designed for - * ease of end user use. In this second use case, we may want to reject an attempt to - * ingest columns other than those in the schema. To do that, set {@code sealed = true}. - * In other words, "sealed" mode works like a traditional RDBMS. - */ public static final String SEALED_PROPERTY = "sealed"; public static final String TABLE_TYPE = "datasource"; @@ -148,7 +138,7 @@ public DatasourceDefn() protected void validateColumn(ColumnSpec spec) { super.validateColumn(spec); - if (Columns.isTimeColumn(spec.name()) && spec.sqlType() != null) { + if (Columns.isTimeColumn(spec.name()) && spec.dataType() != null) { // Validate type in next PR } } diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java index 779d50b0ddbc..09a19d118276 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableDefn.java @@ -205,6 +205,11 @@ public class ExternalTableDefn extends TableDefn */ public static final String TABLE_TYPE = "extern"; + /** + * Column type for external tables. + */ + public static final String EXTERNAL_COLUMN_TYPE = "extern"; + /** * Property which holds the input source specification as serialized as JSON. */ diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java index a00e71b89174..787e929d9b5b 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java @@ -109,7 +109,7 @@ protected List addFormatParameters( toAdd.add(prop); } else if (existing.type() != prop.type()) { throw new ISE( - "Format %s, property %s of class %s conflicts with another format property of class %s", + "Format [%s], property [%s] of class [%s] conflicts with another format property of class [%s]", format.typeValue(), prop.name(), prop.type().sqlName(), @@ -131,7 +131,7 @@ protected InputFormat convertTableToFormat(ResolvedExternalTable table) final InputFormatDefn formatDefn = formats.get(formatTag); if (formatDefn == null) { throw new IAE( - "Format type [%s] for property %s is not valid", + "Format type [%s] for property [%s] is not valid", formatTag, InputFormat.TYPE_PROPERTY ); diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java b/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java index 898eb53993b0..488de63190c5 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/TableBuilder.java @@ -188,7 +188,7 @@ public TableBuilder column(ColumnSpec column) public TableBuilder timeColumn() { - return column(Columns.TIME_COLUMN, Columns.TIMESTAMP); + return column(Columns.TIME_COLUMN, Columns.LONG); } public TableBuilder column(String name, String sqlType) diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java index 2ae285b96868..b47d9487c1a5 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/BaseExternTableTest.java @@ -39,8 +39,8 @@ public class BaseExternTableTest { public static final Map CSV_FORMAT = ImmutableMap.of("type", CsvInputFormat.TYPE_KEY); protected static final List COLUMNS = Arrays.asList( - new ColumnSpec("x", Columns.VARCHAR, null), - new ColumnSpec("y", Columns.BIGINT, null) + new ColumnSpec("x", Columns.STRING, null), + new ColumnSpec("y", Columns.LONG, null) ); protected final ObjectMapper mapper = DefaultObjectMapper.INSTANCE; diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java index 91926210419a..b2995f1838be 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/CsvInputFormatTest.java @@ -48,7 +48,7 @@ public void testDefaults() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(CSV_FORMAT) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -70,8 +70,8 @@ public void testConversion() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(formatToMap(format)) - .column("a", Columns.VARCHAR) - .column("b", Columns.BIGINT) + .column("a", Columns.STRING) + .column("b", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java index 5d5f7f8e0f39..974f1b0e1c79 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java @@ -28,12 +28,13 @@ import org.apache.druid.catalog.model.ResolvedTable; import org.apache.druid.catalog.model.TableDefn; import org.apache.druid.catalog.model.TableDefnRegistry; -import org.apache.druid.catalog.model.TableMetadata; import org.apache.druid.catalog.model.TableSpec; import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.column.ColumnType; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,6 +50,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -230,6 +232,8 @@ public void testColumns() .buildSpec(); ResolvedTable table = registry.resolve(spec); table.validate(); + DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec())); + assertTrue(facade.columnFacades().isEmpty()); } // OK to have no column type @@ -241,72 +245,45 @@ public void testColumns() table.validate(); DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec())); - assertNotNull(facade.jsonMapper()); - assertEquals(1, facade.properties().size()); + assertEquals(1, facade.columnFacades().size()); + ColumnFacade col = facade.columnFacades().get(0); + assertSame(spec.columns().get(0), col.spec()); + assertFalse(col.isTime()); + assertFalse(col.hasType()); + assertNull(col.druidType()); } // Can have a legal scalar type { TableSpec spec = builder.copy() - .column("foo", Columns.VARCHAR) + .column("foo", Columns.STRING) .buildSpec(); ResolvedTable table = registry.resolve(spec); table.validate(); + DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec())); + assertEquals(1, facade.columnFacades().size()); + ColumnFacade col = facade.columnFacades().get(0); + assertSame(spec.columns().get(0), col.spec()); + assertFalse(col.isTime()); + assertTrue(col.hasType()); + assertSame(ColumnType.STRING, col.druidType()); } // Reject duplicate columns { TableSpec spec = builder.copy() - .column("foo", Columns.VARCHAR) - .column("bar", Columns.BIGINT) + .column("foo", Columns.STRING) + .column("bar", Columns.LONG) .buildSpec(); expectValidationSucceeds(spec); } { TableSpec spec = builder.copy() - .column("foo", Columns.VARCHAR) - .column("foo", Columns.BIGINT) + .column("foo", Columns.STRING) + .column("foo", Columns.LONG) .buildSpec(); expectValidationFails(spec); } - { - TableSpec spec = builder.copy() - .column(Columns.TIME_COLUMN, null) - .column("s", Columns.VARCHAR) - .column("bi", Columns.BIGINT) - .column("f", Columns.FLOAT) - .column("d", Columns.DOUBLE) - .buildSpec(); - ResolvedTable table = registry.resolve(spec); - table.validate(); - } - } - - @Test - public void testRollup() - { - TableMetadata table = TableBuilder.datasource("foo", "P1D") - .column(Columns.TIME_COLUMN, "TIMESTAMP('PT1M')") - .column("a", null) - .column("b", Columns.VARCHAR) - .column("c", "SUM(BIGINT)") - .build(); - - table.validate(); - List columns = table.spec().columns(); - - assertEquals(4, columns.size()); - assertEquals(Columns.TIME_COLUMN, columns.get(0).name()); - assertEquals("TIMESTAMP('PT1M')", columns.get(0).sqlType()); - - assertEquals("a", columns.get(1).name()); - assertNull(columns.get(1).sqlType()); - - assertEquals("b", columns.get(2).name()); - assertEquals(Columns.VARCHAR, columns.get(2).sqlType()); - - assertEquals("c", columns.get(3).name()); - assertEquals("SUM(BIGINT)", columns.get(3).sqlType()); } @Test @@ -321,6 +298,14 @@ public void testTimeColumn() .buildSpec(); ResolvedTable table = registry.resolve(spec); table.validate(); + + DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec())); + assertEquals(1, facade.columnFacades().size()); + ColumnFacade col = facade.columnFacades().get(0); + assertSame(spec.columns().get(0), col.spec()); + assertTrue(col.isTime()); + assertTrue(col.hasType()); + assertSame(ColumnType.LONG, col.druidType()); } // Time column can only have TIMESTAMP type @@ -330,14 +315,20 @@ public void testTimeColumn() .buildSpec(); ResolvedTable table = registry.resolve(spec); table.validate(); + DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec())); + assertEquals(1, facade.columnFacades().size()); + ColumnFacade col = facade.columnFacades().get(0); + assertSame(spec.columns().get(0), col.spec()); + assertTrue(col.isTime()); + assertTrue(col.hasType()); + assertSame(ColumnType.LONG, col.druidType()); } { TableSpec spec = builder.copy() - .column(Columns.TIME_COLUMN, "TIMESTAMP('PT5M')") + .column(Columns.TIME_COLUMN, Columns.STRING) .buildSpec(); - ResolvedTable table = registry.resolve(spec); - table.validate(); + expectValidationFails(spec); } } @@ -365,7 +356,7 @@ private TableSpec exampleSpec() .property("tag1", "some value") .property("tag2", "second value") .column(new ColumnSpec("a", null, colProps)) - .column("b", Columns.VARCHAR) + .column("b", Columns.STRING) .buildSpec(); // Sanity check @@ -493,7 +484,7 @@ public void testMergeColsWithEmptyList() List colUpdates = Collections.singletonList( new ColumnSpec( "a", - Columns.BIGINT, + Columns.LONG, null ) ); @@ -502,7 +493,7 @@ public void testMergeColsWithEmptyList() List columns = merged.columns(); assertEquals(1, columns.size()); assertEquals("a", columns.get(0).name()); - assertEquals(Columns.BIGINT, columns.get(0).sqlType()); + assertEquals(Columns.LONG, columns.get(0).dataType()); } @Test @@ -521,12 +512,12 @@ public void testMergeCols() List colUpdates = Arrays.asList( new ColumnSpec( "a", - Columns.BIGINT, + Columns.LONG, updatedProps ), new ColumnSpec( "c", - Columns.VARCHAR, + Columns.STRING, null ) ); @@ -537,14 +528,14 @@ public void testMergeCols() List columns = merged.columns(); assertEquals(3, columns.size()); assertEquals("a", columns.get(0).name()); - assertEquals(Columns.BIGINT, columns.get(0).sqlType()); + assertEquals(Columns.LONG, columns.get(0).dataType()); Map colProps = columns.get(0).properties(); assertEquals(2, colProps.size()); assertEquals("new value", colProps.get("colProp1")); assertEquals("third value", colProps.get("tag3")); assertEquals("c", columns.get(2).name()); - assertEquals(Columns.VARCHAR, columns.get(2).sqlType()); + assertEquals(Columns.STRING, columns.get(2).dataType()); } /** @@ -560,9 +551,9 @@ public void docExample() .description("Web server performance metrics") .property(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000) .hiddenColumns("foo", "bar") - .column("__time", Columns.TIMESTAMP) - .column("host", Columns.VARCHAR, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host")) - .column("bytesSent", Columns.BIGINT, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent")) + .column("__time", Columns.LONG) + .column("host", Columns.STRING, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host")) + .column("bytesSent", Columns.LONG, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent")) .clusterColumns(new ClusterKeySpec("a", false), new ClusterKeySpec("b", true)) .sealed(true) .buildSpec(); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java index db573563eecb..04494ec7c343 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/DelimitedInputFormatTest.java @@ -55,7 +55,7 @@ public void testDefaults() DelimitedFormatDefn.DELIMITER_FIELD, "|" ) ) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -78,8 +78,8 @@ public void testConversion() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(formatToMap(format)) - .column("a", Columns.VARCHAR) - .column("b", Columns.BIGINT) + .column("a", Columns.STRING) + .column("b", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java index 96afcad9a8cc..8c8129bf0fcc 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java @@ -126,7 +126,7 @@ public void testValidateSourceAndFormat() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(formatToMap(format)) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -147,16 +147,16 @@ public void wikipediaDocExample() .inputSource(toMap(inputSource)) .inputFormat(formatToMap(format)) .description("Sample Wikipedia data") - .column("timetamp", Columns.VARCHAR) - .column("page", Columns.VARCHAR) - .column("language", Columns.VARCHAR) - .column("unpatrolled", Columns.VARCHAR) - .column("newPage", Columns.VARCHAR) - .column("robot", Columns.VARCHAR) - .column("added", Columns.VARCHAR) - .column("namespace", Columns.BIGINT) - .column("deleted", Columns.BIGINT) - .column("delta", Columns.BIGINT) + .column("timetamp", Columns.STRING) + .column("page", Columns.STRING) + .column("language", Columns.STRING) + .column("unpatrolled", Columns.STRING) + .column("newPage", Columns.STRING) + .column("robot", Columns.STRING) + .column("added", Columns.STRING) + .column("namespace", Columns.LONG) + .column("deleted", Columns.LONG) + .column("delta", Columns.LONG) .build(); LOG.info(table.spec().toString()); } @@ -179,9 +179,9 @@ public void httpDocExample() throws URISyntaxException .inputFormat(CSV_FORMAT) .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "https://example.com/{}") .description("Example parameterized external table") - .column("timetamp", Columns.VARCHAR) - .column("metric", Columns.VARCHAR) - .column("value", Columns.BIGINT) + .column("timetamp", Columns.STRING) + .column("metric", Columns.STRING) + .column("value", Columns.LONG) .build(); LOG.info(table.spec().toString()); } diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java index 215c9e0c62e6..8a6385db59ce 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java @@ -68,8 +68,8 @@ public void testEmptyInputSource() TableMetadata table = TableBuilder.external("foo") .inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -83,8 +83,8 @@ public void testInvalidTemplate() .inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY)) .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://example.com/") .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -103,8 +103,8 @@ public void testNoFormatWithURI() throws URISyntaxException ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -221,8 +221,8 @@ public void testFullTableSpecHappyPath() throws URISyntaxException TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -256,8 +256,8 @@ public void testTemplateSpecWithFormatHappyPath() .inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY)) .inputFormat(CSV_FORMAT) .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}") - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -305,8 +305,8 @@ public void testTemplateSpecWithFormatAndPassword() )) .inputFormat(CSV_FORMAT) .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}") - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); table.validate(); @@ -387,8 +387,8 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -421,8 +421,8 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException .inputSource(httpToMap(inputSource)) .inputFormat(CSV_FORMAT) .property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}") - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -489,8 +489,8 @@ public void testEnvPassword() throws URISyntaxException TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java index 448e353664dc..bb3b2354bac9 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java @@ -51,7 +51,7 @@ public void testValidateEmptyInputSource() TableMetadata table = TableBuilder.external("foo") .inputSource(ImmutableMap.of("type", InlineInputSource.TYPE_KEY)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -63,7 +63,7 @@ public void testValidateNoFormat() // No format: not valid. For inline, format must be provided to match data TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -86,7 +86,7 @@ public void testValidateGood() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) + .column("x", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -130,8 +130,8 @@ public void testValidAdHocFn() args.put(InlineInputSourceDefn.DATA_PROPERTY, Arrays.asList("a,b", "c,d")); args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, CsvFormatDefn.TYPE_KEY); final List columns = Arrays.asList( - new ColumnSpec("a", Columns.VARCHAR, null), - new ColumnSpec("b", Columns.VARCHAR, null) + new ColumnSpec("a", Columns.STRING, null), + new ColumnSpec("b", Columns.STRING, null) ); final TableFunction fn = defn.adHocTableFn(); @@ -157,8 +157,8 @@ public void testPartialTable() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a,b\nc,d\n"))) .inputFormat(CSV_FORMAT) - .column("a", Columns.VARCHAR) - .column("b", Columns.VARCHAR) + .column("a", Columns.STRING) + .column("b", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -183,8 +183,8 @@ public void testPartialTable() // Cannot supply columns with the function List columns = Arrays.asList( - new ColumnSpec("a", Columns.VARCHAR, null), - new ColumnSpec("b", Columns.VARCHAR, null) + new ColumnSpec("a", Columns.STRING, null), + new ColumnSpec("b", Columns.STRING, null) ); assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), columns, mapper)); } @@ -198,8 +198,8 @@ public void testDefinedTable() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a,b\nc,d"))) .inputFormat(formatToMap(format)) - .column("a", Columns.VARCHAR) - .column("b", Columns.VARCHAR) + .column("a", Columns.STRING) + .column("b", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java index ecc88db3f990..2c42b671da83 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/JsonInputFormatTest.java @@ -48,7 +48,7 @@ public void testDefaults() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(ImmutableMap.of("type", JsonInputFormat.TYPE_KEY)) - .column("a", Columns.VARCHAR) + .column("a", Columns.STRING) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -70,8 +70,8 @@ public void testConversion() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(new InlineInputSource("a\n"))) .inputFormat(formatToMap(format)) - .column("a", Columns.VARCHAR) - .column("b", Columns.BIGINT) + .column("a", Columns.STRING) + .column("b", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java index 3994bf011258..32f9b0641b87 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java @@ -61,8 +61,8 @@ public void testValidateEmptyInputSource() TableMetadata table = TableBuilder.external("foo") .inputSource(ImmutableMap.of("type", LocalInputSource.TYPE_KEY)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -75,8 +75,8 @@ public void testValidateNoFormat() LocalInputSource inputSource = new LocalInputSource(new File("/tmp"), "*"); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -129,8 +129,8 @@ public void testValidateBaseDirWithFormat() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -150,8 +150,8 @@ public void testValidateFilesWithFormat() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); resolved.validate(); @@ -169,8 +169,8 @@ public void testBaseDirAndFiles() TableMetadata table = TableBuilder.external("foo") .inputSource(source) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); ResolvedTable resolved = registry.resolve(table.spec()); assertThrows(IAE.class, () -> resolved.validate()); @@ -321,8 +321,8 @@ public void testFullyDefinedBaseDirAndPattern() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -373,8 +373,8 @@ public void testFullyDefinedFiles() TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation @@ -418,8 +418,8 @@ public void testBaseDirAndFormat() TableMetadata table = TableBuilder.external("foo") .inputSource(BASE_DIR_ONLY) .inputFormat(CSV_FORMAT) - .column("x", Columns.VARCHAR) - .column("y", Columns.BIGINT) + .column("x", Columns.STRING) + .column("y", Columns.LONG) .build(); // Check validation diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 580b9acbb520..4e8431cd5c67 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.schema; import org.apache.calcite.schema.Table; +import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.table.DatasourceTable; import javax.inject.Inject; @@ -29,14 +30,17 @@ public class DruidSchema extends AbstractTableSchema { private final BrokerSegmentMetadataCache segmentMetadataCache; private final DruidSchemaManager druidSchemaManager; + private final CatalogResolver catalogResolver; @Inject public DruidSchema( final BrokerSegmentMetadataCache segmentMetadataCache, - final DruidSchemaManager druidSchemaManager + final DruidSchemaManager druidSchemaManager, + final CatalogResolver catalogResolver ) { this.segmentMetadataCache = segmentMetadataCache; + this.catalogResolver = catalogResolver; if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) { this.druidSchemaManager = druidSchemaManager; } else { @@ -56,7 +60,7 @@ public Table getTable(String name) return druidSchemaManager.getTable(name); } else { DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentMetadataCache.getDatasource(name); - return dsMetadata == null ? null : new DatasourceTable(dsMetadata); + return catalogResolver.resolveDatasource(name, dsMetadata); } } @@ -66,7 +70,7 @@ public Set getTableNames() if (druidSchemaManager != null) { return druidSchemaManager.getTableNames(); } else { - return segmentMetadataCache.getDatasourceNames(); + return catalogResolver.getTableNames(segmentMetadataCache.getDatasourceNames()); } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java index eeac85a2c1ed..57cecf50645e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java @@ -23,11 +23,15 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.druid.catalog.model.facade.DatasourceFacade; import org.apache.druid.query.DataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.metadata.DataSourceInformation; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; /** @@ -79,6 +83,24 @@ public boolean isBroadcast() return broadcast; } + public Map toEffectiveColumns() + { + Map columns = new HashMap<>(); + for (int i = 0; i < getRowSignature().size(); i++) { + String colName = getRowSignature().getColumnName(i); + ColumnType colType = getRowSignature().getColumnType(i).get(); + + EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, colType); + columns.put(colName, colMetadata); + } + return columns; + } + + public EffectiveMetadata toEffectiveMetadata() + { + return new EffectiveMetadata(null, toEffectiveColumns(), false); + } + @Override public boolean equals(Object o) { @@ -115,14 +137,107 @@ public String toString() } } + public static class EffectiveColumnMetadata + { + protected final String name; + protected final ColumnType type; + + public EffectiveColumnMetadata(String name, ColumnType type) + { + this.name = name; + this.type = type; + } + + public String name() + { + return name; + } + + public ColumnType druidType() + { + return type; + } + + public static EffectiveColumnMetadata fromPhysical(String name, ColumnType type) + { + return new EffectiveColumnMetadata(name, type); + } + + @Override + public String toString() + { + return "Column{" + + "name=" + name + + ", type=" + type.asTypeString() + + "}"; + } + } + + public static class EffectiveMetadata + { + private final DatasourceFacade catalogMetadata; + private final boolean isEmpty; + private final Map columns; + + public EffectiveMetadata( + final DatasourceFacade catalogMetadata, + final Map columns, + final boolean isEmpty + ) + { + this.catalogMetadata = catalogMetadata; + this.isEmpty = isEmpty; + this.columns = columns; + } + + public DatasourceFacade catalogMetadata() + { + return catalogMetadata; + } + + public EffectiveColumnMetadata column(String name) + { + return columns.get(name); + } + + public boolean isEmpty() + { + return isEmpty; + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "empty=" + isEmpty + + ", columns=" + columns + + "}"; + } + } + private final PhysicalDatasourceMetadata physicalMetadata; + private final EffectiveMetadata effectiveMetadata; public DatasourceTable( final PhysicalDatasourceMetadata physicalMetadata ) { - super(physicalMetadata.getRowSignature()); + this( + physicalMetadata.getRowSignature(), + physicalMetadata, + physicalMetadata.toEffectiveMetadata() + ); + } + + public DatasourceTable( + final RowSignature rowSignature, + final PhysicalDatasourceMetadata physicalMetadata, + final EffectiveMetadata effectiveMetadata + ) + { + super(rowSignature); this.physicalMetadata = physicalMetadata; + this.effectiveMetadata = effectiveMetadata; } @Override @@ -143,6 +258,11 @@ public boolean isBroadcast() return physicalMetadata.isBroadcast(); } + public EffectiveMetadata effectiveMetadata() + { + return effectiveMetadata; + } + @Override public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table) { @@ -176,9 +296,10 @@ public int hashCode() public String toString() { // Don't include the row signature: it is the same as in - // physicalMetadata. - return "DruidTable{" + - physicalMetadata + + // effectiveMetadata. + return "DruidTable{physicalMetadata=" + + (physicalMetadata == null ? "null" : physicalMetadata.toString()) + + ", effectiveMetadata=" + effectiveMetadata + '}'; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index 558625a10b36..ff29a8743242 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -481,9 +481,9 @@ protected String externClauseFromSig(final ExternalDataSource externalDataSource buf.append(sig.getColumnName(i)).append(" "); ColumnType type = sig.getColumnType(i).get(); if (type == ColumnType.STRING) { - buf.append(Columns.VARCHAR); + buf.append(Columns.SQL_VARCHAR); } else if (type == ColumnType.LONG) { - buf.append(Columns.BIGINT); + buf.append(Columns.SQL_BIGINT); } else if (type == ColumnType.DOUBLE) { buf.append(Columns.DOUBLE); } else if (type == ColumnType.FLOAT) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java index 540b39e63899..e9c5d773fefa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlTestFrameworkConfig.java @@ -117,6 +117,7 @@ public SqlTestFramework get() private SqlTestFramework createFramework(SqlTestFrameworkConfig config) { SqlTestFramework.Builder builder = new SqlTestFramework.Builder(testHost) + .catalogResolver(testHost.createCatalogResolver()) .minTopNThreshold(config.minTopNThreshold()) .mergeBufferCount(config.numMergeBuffers()); return builder.build(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index cebb9e5446f8..08f16ae9307f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -31,6 +31,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.NoopEscalator; +import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.TestTimelineServerView; @@ -67,7 +68,7 @@ public void testInitializationWithNoData() throws Exception cache.start(); cache.awaitInitialization(); - final DruidSchema druidSchema = new DruidSchema(cache, null); + final DruidSchema druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER); Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames()); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java index 43551b3e0403..f2ac64826ee8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/InformationSchemaTest.java @@ -38,6 +38,7 @@ import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.RowSignatures; @@ -72,7 +73,8 @@ public void setUp() new PlannerConfig(), null, new NoopDruidSchemaManager(), - CalciteTests.TEST_AUTHORIZER_MAPPER + CalciteTests.TEST_AUTHORIZER_MAPPER, + CatalogResolver.NULL_RESOLVER ); informationSchema = new InformationSchema( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index e9cb1430a544..e0a3bb5b266a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -95,6 +95,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable; import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.util.CalciteTestBase; @@ -266,7 +267,7 @@ public void setUp() throws Exception ); cache.start(); cache.awaitInitialization(); - druidSchema = new DruidSchema(cache, null); + druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER); metadataView = EasyMock.createMock(MetadataSegmentView.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java index c3d2dd9b2416..040912fd6133 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java @@ -50,6 +50,7 @@ import org.apache.druid.sql.SqlLifecycleManager; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; +import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; @@ -131,14 +132,16 @@ public static DruidSchemaCatalog createMockRootSchema( final PlannerConfig plannerConfig, @Nullable final ViewManager viewManager, final DruidSchemaManager druidSchemaManager, - final AuthorizerMapper authorizerMapper + final AuthorizerMapper authorizerMapper, + final CatalogResolver catalogResolver ) { DruidSchema druidSchema = createMockSchema( injector, conglomerate, walker, - druidSchemaManager + druidSchemaManager, + catalogResolver ); SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, authorizerMapper); @@ -193,7 +196,8 @@ public static DruidSchemaCatalog createMockRootSchema( plannerConfig, null, new NoopDruidSchemaManager(), - authorizerMapper + authorizerMapper, + CatalogResolver.NULL_RESOLVER ); } @@ -201,7 +205,8 @@ private static DruidSchema createMockSchema( final Injector injector, final QueryRunnerFactoryConglomerate conglomerate, final SpecificSegmentsQuerySegmentWalker walker, - final DruidSchemaManager druidSchemaManager + final DruidSchemaManager druidSchemaManager, + final CatalogResolver catalog ) { final BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache( @@ -220,7 +225,8 @@ public Set getDataSourceNames() { return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE); } - }), + } + ), null ); @@ -233,7 +239,7 @@ public Set getDataSourceNames() } cache.stop(); - return new DruidSchema(cache, druidSchemaManager); + return new DruidSchema(cache, druidSchemaManager, catalog); } public static JoinableFactory createDefaultJoinableFactory(Injector injector) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 5db5090cb2a1..fa7055d3e582 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -155,6 +155,11 @@ SqlEngine createEngine( Injector injector ); + default CatalogResolver createCatalogResolver() + { + return CatalogResolver.NULL_RESOLVER; + } + /** * Configure the JSON mapper. * @@ -419,7 +424,8 @@ public PlannerFixture( plannerConfig, viewManager, componentSupplier.createSchemaManager(), - framework.authorizerMapper + framework.authorizerMapper, + framework.builder.catalogResolver ); this.plannerFactory = new PlannerFactory(