Skip to content

Commit 9b5c851

Browse files
author
Vatsal Mevada
committed
Fetching hive tables from all schemas instead of tables from only
current schema.
1 parent d147018 commit 9b5c851

File tree

3 files changed

+12
-18
lines changed

3 files changed

+12
-18
lines changed

cluster/src/main/scala/io/snappydata/gemxd/ClusterCallbacksImpl.scala

+5-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.{lang, util}
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.util.Try
25-
import scala.util.control.NonFatal
2625

2726
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
2827
import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl}
@@ -37,20 +36,16 @@ import com.pivotal.gemfirexd.internal.snappy.{CallbackFactoryProvider, ClusterCa
3736
import io.snappydata.cluster.ExecutorInitiator
3837
import io.snappydata.impl.LeadImpl
3938
import io.snappydata.recovery.RecoveryService
40-
import io.snappydata.sql.catalog.{CatalogObjectType, SnappyExternalCatalog}
39+
import io.snappydata.sql.catalog.CatalogObjectType
4140
import io.snappydata.util.ServiceUtils
4241
import io.snappydata.{ServiceManager, SnappyEmbeddedTableStatsProviderService}
4342

4443
import org.apache.spark.Logging
4544
import org.apache.spark.scheduler.cluster.SnappyClusterManager
4645
import org.apache.spark.serializer.{KryoSerializerPool, StructTypeSerializer}
47-
import org.apache.spark.sql.SaveMode
48-
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
46+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
4947
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
50-
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap
51-
import org.apache.spark.sql.execution.datasources.DataSource
52-
import org.apache.spark.sql.internal.SnappySessionCatalog
53-
import org.apache.spark.sql.sources.DataSourceRegister
48+
import org.apache.spark.sql.{SaveMode, SnappyContext}
5449

5550
/**
5651
* Callbacks that are sent by GemXD to Snappy for cluster management
@@ -263,11 +258,9 @@ object ClusterCallbacksImpl extends ClusterCallbacks with Logging {
263258
}
264259
}
265260

266-
override def getHiveTablesMetadata(connectionId: Long, schema: String):
261+
override def getHiveTablesMetadata():
267262
util.Collection[ExternalTableMetaData] = {
268-
val session = SnappySessionPerConnection.getSnappySessionForConnection(connectionId)
269-
val catalogTables = session.sessionState.catalog.asInstanceOf[SnappySessionCatalog]
270-
.getHiveCatalogTables(schema)
263+
val catalogTables = SnappyContext.getHiveCatalogTables()
271264
import scala.collection.JavaConverters._
272265
getTablesMetadata(catalogTables).asJava
273266
}

core/src/main/scala/org/apache/spark/sql/SnappyContext.scala

+7
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.spark.memory.MemoryManagerCallback
5050
import org.apache.spark.rdd.RDD
5151
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerExecutorAdded}
5252
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
53+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
5354
import org.apache.spark.sql.catalyst.expressions.SortDirection
5455
import org.apache.spark.sql.collection.{ToolsCallbackInit, Utils}
5556
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap
@@ -875,6 +876,12 @@ object SnappyContext extends Logging {
875876
}
876877
}
877878

879+
def getHiveCatalogTables(skipSchemas: Seq[String] = "SYS" :: Nil): Seq[CatalogTable] = {
880+
val catalog = hiveSession.sessionState.catalog
881+
catalog.listDatabases().filter(s => skipSchemas.isEmpty || !skipSchemas.contains(s)).
882+
flatMap(schema => catalog.listTables(schema).map(table => catalog.getTableMetadata(table)))
883+
}
884+
878885
private[spark] def getBlockIdIfNull(
879886
executorId: String): Option[BlockAndExecutorId] =
880887
Option(storeToBlockMap.get(executorId))

core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala

-6
Original file line numberDiff line numberDiff line change
@@ -897,12 +897,6 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
897897
} else super.listTables(schema, pattern)
898898
}
899899

900-
def getHiveCatalogTables(schema: String): Seq[CatalogTable] =
901-
if (snappySession.enableHiveSupport) {
902-
hiveSessionCatalog.listTables(schema)
903-
.map(ti => hiveSessionCatalog.getTableMetadata(ti))
904-
} else Seq.empty[CatalogTable]
905-
906900
override def refreshTable(name: TableIdentifier): Unit = {
907901
val table = addMissingGlobalTempSchema(name)
908902
if (isTemporaryTable(table)) {

0 commit comments

Comments
 (0)