Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP 3104] Showing external hive metastore tables as part of HIVETABLES VTI #530

Open
wants to merge 14 commits into
base: snappy/master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public ExternalTableMetaData(String entityName,
public List<Column> columns;
public boolean hasDependentSampleTables;

public ExternalTableMetaData() { }

@Override
public String toString() {
return "ObjectMetadata(name=" + this.entityName + ", schema=" + this.schema +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import com.pivotal.gemfirexd.internal.engine.ui.SnappyRegionStatsCollectorResult;
import com.pivotal.gemfirexd.internal.impl.store.raw.data.GfxdJarMessage;
import com.pivotal.gemfirexd.internal.snappy.LeadNodeExecutionContext;
import com.pivotal.gemfirexd.internal.snappy.hivetables.dto.ExternalHiveTablesCollectorResult;
import com.pivotal.gemfirexd.tools.planexporter.ExecutionPlanMessage;

/**
Expand Down Expand Up @@ -270,6 +271,8 @@ public static synchronized boolean initTypes() {
() -> new SnappyResultHolder());
DSFIDFactory.registerGemFireXDClass(SNAPPY_REGION_STATS_RESULT,
() -> new SnappyRegionStatsCollectorResult());
DSFIDFactory.registerGemFireXDClass(HIVE_TABLES_COLLECTOR_RESULT,
() -> new ExternalHiveTablesCollectorResult());
DSFIDFactory.registerGemFireXDClass(MEMBER_STATISTICS_MESSAGE,
() -> new MemberStatisticsMessage());
DSFIDFactory.registerGemFireXDClass(MEMBER_LOGS_MESSAGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public interface GfxdSerializable extends GfxdDSFID {

byte SNAPPY_REGION_STATS_RESULT = 111;

// 112 is unused -- use for new IDs
byte HIVE_TABLES_COLLECTOR_RESULT= 112;

byte COLUMN_FORMAT_KEY = 113;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@
* LICENSE file.
*/

/*
* Changes for SnappyData data platform.
*
* Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.pivotal.gemfirexd.internal.engine;

import java.io.CharArrayWriter;
Expand Down Expand Up @@ -275,6 +293,20 @@ public static Set<DistributedMember> getLeadNode() {
throw new NoMemberFoundException("SnappyData Lead node is not available");
}

/**
*
* @return Optional of profile of primary lead node if primary lead exists in cluster
* Optional#empty if primary lead node does not exist in cluster
*/
public static Optional<GfxdDistributionAdvisor.GfxdProfile> getPrimaryLeadProfile(){
return getLeadNode().stream().map(GemFireXDUtils::getGfxdProfile)
.filter(GfxdDistributionAdvisor.GfxdProfile::hasSparkURL).findFirst();
}

public static boolean isLead(){
return GemFireXDUtils.getGfxdAdvisor().getMyProfile().hasSparkURL();
}

/**
* Check if {@link GemFireCache} is closed or is in the process of closing and
* throw {@link CacheClosedException} if so.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,35 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.TimeUnit;

import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.internal.cache.ExternalTableMetaData;
import com.gemstone.gemfire.internal.shared.SystemProperties;
import com.google.common.collect.Iterators;
import com.pivotal.gemfirexd.internal.catalog.ExternalCatalog;
import com.pivotal.gemfirexd.internal.engine.GfxdVTITemplate;
import com.pivotal.gemfirexd.internal.engine.GfxdVTITemplateNoAllNodesRoute;
import com.pivotal.gemfirexd.internal.engine.Misc;
import com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor;
import com.pivotal.gemfirexd.internal.engine.jdbc.GemFireXDRuntimeException;
import com.pivotal.gemfirexd.internal.iapi.sql.ResultColumnDescriptor;
import com.pivotal.gemfirexd.internal.iapi.sql.conn.LanguageConnectionContext;
import com.pivotal.gemfirexd.internal.iapi.types.HarmonySerialClob;
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedResultSetMetaData;
import com.pivotal.gemfirexd.internal.impl.jdbc.Util;
import com.pivotal.gemfirexd.internal.impl.sql.catalog.GfxdDataDictionary;
import com.pivotal.gemfirexd.internal.shared.common.SharedUtils;
import com.pivotal.gemfirexd.internal.shared.common.reference.Limits;
import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState;
import com.pivotal.gemfirexd.internal.snappy.hivetables.ExternalHiveTablesCollectorFunction;
import com.pivotal.gemfirexd.internal.snappy.hivetables.dto.ExternalHiveTablesCollectorResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -69,7 +79,9 @@ public boolean next() {
if (!GfxdDataDictionary.SKIP_CATALOG_OPS.get().skipHiveCatalogCalls &&
(hiveCatalog = Misc.getMemStore().getExternalCatalog()) != null) {
try {
this.tableMetas = hiveCatalog.getCatalogTables().iterator();
List<ExternalTableMetaData> catalogTables = hiveCatalog.getCatalogTables();
Collection<ExternalTableMetaData> externalHiveTables = getExternalHiveTables();
this.tableMetas = Iterators.concat(catalogTables.iterator(), externalHiveTables.iterator());
} catch (Exception e) {
// log and move on
logger.warn("ERROR in retrieving Hive tables: " + e.toString());
Expand All @@ -96,6 +108,21 @@ public boolean next() {
}
}

private Collection<ExternalTableMetaData> getExternalHiveTables() throws InterruptedException {
GfxdDistributionAdvisor.GfxdProfile primaryLeadProfile = Misc.getPrimaryLeadProfile()
.<IllegalArgumentException>orElseThrow(() -> {
throw new IllegalStateException("Lead not available");
});
if (!Misc.isLead() && primaryLeadProfile.isHiveSessionInitialized()) {
ArrayList result = (ArrayList)FunctionService.onMembers(Misc.getLeadNode())
.execute(ExternalHiveTablesCollectorFunction.ID)
.getResult(30, TimeUnit.SECONDS);
return ((ExternalHiveTablesCollectorResult)(result).get(0)).getTablesMetadata();
} else {
return Collections.emptyList();
}
}

@Override
protected Object getObjectForColumn(int columnNumber) {
String provider = this.currentTableMeta.shortProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,13 @@ public static final class GfxdProfile extends DistributionAdvisor.Profile
*/
private long usableHeap;

/**
* Flag indicating whether the external hive catalog is enabled. By default it's false.
* Set to true when hive session is initialized at
* org.apache.spark.sql.SnappyContext#newHiveSession().
*/
private boolean hiveSessionInitialized;

/** for deserialization */
public GfxdProfile() {
this.initialized = true;
Expand Down Expand Up @@ -1450,7 +1457,7 @@ public final void setInitialized(boolean initialized) {
public final boolean getInitialized() {
return this.initialized;
}

public final String getLocale() {
return this.dbLocaleStr;
}
Expand All @@ -1463,6 +1470,14 @@ public final long getUsableHeap() {
return this.usableHeap;
}

public void setHiveSessionInitialized(boolean hiveSessionInitialized) {
this.hiveSessionInitialized = hiveSessionInitialized;
}

public boolean isHiveSessionInitialized() {
return hiveSessionInitialized;
}

@Override
public void processIncoming(DistributionManager dm, String adviseePath,
boolean removeProfile, boolean exchangeProfiles,
Expand Down Expand Up @@ -1549,6 +1564,7 @@ public final void toData(DataOutput out) throws IOException {
}
out.writeLong(this.catalogSchemaVersion.get());
out.writeLong(this.usableHeap);
out.writeBoolean(this.hiveSessionInitialized);
}

@Override
Expand Down Expand Up @@ -1589,6 +1605,7 @@ public final void fromData(DataInput in) throws IOException,
if ((this.flags & F_HAS_USABLE_HEAP) != 0) {
this.usableHeap = in.readLong();
}
this.hiveSessionInitialized = in.readBoolean();
}

@Override
Expand All @@ -1608,6 +1625,7 @@ public final void fillInToString(StringBuilder sb) {
sb.append("; numProcessors=").append(this.numProcessors);
sb.append("; catalogVersion=").append(this.catalogSchemaVersion.get());
sb.append("; usableHeap=").append(this.usableHeap);
sb.append("; hiveSessionInitialized=").append(this.hiveSessionInitialized);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryTimeStatistics;
import com.pivotal.gemfirexd.internal.engine.GfxdConstants;
import com.pivotal.gemfirexd.internal.engine.GfxdDataSerializable;
import com.pivotal.gemfirexd.internal.snappy.hivetables.ExternalHiveTablesCollectorFunction;
import com.pivotal.gemfirexd.internal.engine.Misc;
import com.pivotal.gemfirexd.internal.engine.SigThreadDumpHandler;
import com.pivotal.gemfirexd.internal.engine.access.GemFireTransaction;
Expand Down Expand Up @@ -1209,6 +1210,7 @@ else if (!hostData) {
FunctionService.registerFunction(new QueryCancelFunction());
FunctionService.registerFunction(new SnappyRegionStatsCollectorFunction());
FunctionService.registerFunction(new DiskStoreIDs.DiskStoreIDFunction());
FunctionService.registerFunction(new ExternalHiveTablesCollectorFunction());

final ConnectionSignaller signaller = ConnectionSignaller.getInstance();
if (logger.fineEnabled()) {
Expand Down Expand Up @@ -2696,6 +2698,8 @@ public static final class StoreAdvisee implements DistributionAdvisee {
*/
private volatile SortedSet<String> serverGroups;

private boolean hiveSessionInitialized;

private final CancelCriterion stopper = new CancelCriterion() {

@Override
Expand Down Expand Up @@ -2781,6 +2785,10 @@ final synchronized void setVMKind(VMKind newKind) {
+ " GemFireXD not booted or closed down.");
}

public void setHiveSessionInitialized(boolean hiveSessionInitialized) {
this.hiveSessionInitialized = hiveSessionInitialized;
}

/**
* Get the server groups of this VM.
*/
Expand Down Expand Up @@ -2809,6 +2817,7 @@ public void fillInProfile(Profile p) {
.isDataDictionaryPersistent());
profile.setLocale(Misc.getMemStoreBooting().getLocale());
profile.serialNumber = getSerialNumber();
profile.setHiveSessionInitialized(hiveSessionInitialized);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package com.pivotal.gemfirexd.internal.snappy;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.cache.ExternalTableMetaData;
import com.gemstone.gemfire.internal.shared.Version;
import com.gemstone.gemfire.internal.snappy.StoreCallbacks;
import com.pivotal.gemfirexd.internal.iapi.sql.ParameterValueSet;
Expand Down Expand Up @@ -111,6 +113,11 @@ public String getClusterType() {
@Override
public void setLeadClassLoader() {
}

@Override
public Collection<ExternalTableMetaData> getHiveTablesMetadata() {
return null;
}
};

public static ClusterCallbacks getClusterCallbacks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package com.pivotal.gemfirexd.internal.snappy;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.cache.ExternalTableMetaData;
import com.gemstone.gemfire.internal.shared.Version;
import com.pivotal.gemfirexd.internal.iapi.sql.ParameterValueSet;
import com.pivotal.gemfirexd.internal.iapi.types.DataValueDescriptor;
Expand Down Expand Up @@ -71,4 +73,11 @@ Iterator<ValueRow> getRowIterator(DataValueDescriptor[] dvds, int[] types,
String getClusterType();

void setLeadClassLoader();

/**
* Used for fetching the metadata of hive tables stored in external hive metastore from lead.
* @return Collection of {@link ExternalTableMetaData} representing necessary information to
* required by SYS.HIVETABLES VTI.
*/
Collection<ExternalTableMetaData> getHiveTablesMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package com.pivotal.gemfirexd.internal.snappy.hivetables;

import java.util.Collection;

import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.internal.cache.ExternalTableMetaData;
import com.pivotal.gemfirexd.internal.snappy.CallbackFactoryProvider;
import com.pivotal.gemfirexd.internal.snappy.hivetables.dto.ExternalHiveTablesCollectorResult;

/**
* Server node(s) use this function to retrieve metadata of hive tables stored
* in external hive metastore (if configured) which is available on lead node.
* Returns {@link ExternalHiveTablesCollectorResult}.
*/
public class ExternalHiveTablesCollectorFunction implements Function {

public static final String ID = "ExternalHiveTablesCollectorFunction";

@Override
public boolean hasResult() {
return true;
}

@Override
public void execute(FunctionContext context) {
Collection<ExternalTableMetaData> hiveTablesMetadata =
CallbackFactoryProvider.getClusterCallbacks().getHiveTablesMetadata();
context.getResultSender().lastResult(new ExternalHiveTablesCollectorResult(hiveTablesMetadata));
}

@Override
public String getId() {
return ID;
}

//todo[vatsal] : verify this return value
@Override
public boolean optimizeForWrite() {
return false;
}

//todo[vatsal] : verify this return value
@Override
public boolean isHA() {
return false;
}
Comment on lines +55 to +64
Copy link
Author

@vatsalmevada vatsalmevada Nov 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to verify these return values.

}
Loading