Skip to content

WIP {!join cacheEventually=true ...}... #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 120 additions & 2 deletions solr/core/src/java/org/apache/solr/search/JoinQParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package org.apache.solr.search;

import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;

import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
Expand All @@ -44,6 +49,8 @@ public class JoinQParserPlugin extends QParserPlugin {

private Set<String> allowSolrUrls;

private final BiFunction<Query, String, Query> eventualCacheFactory;

private static class JoinParams {
final String fromField;
final String fromCore;
Expand Down Expand Up @@ -172,6 +179,14 @@ JoinParams parseJoin(QParser qparser) throws SyntaxError {
}
}

public JoinQParserPlugin() {
this((q,i)->new EventualJoinCacheWrapper(q,i));
}
// test injection
protected JoinQParserPlugin(BiFunction<Query, String, Query> factory) {
this.eventualCacheFactory = factory;
}

@Override
public void init(NamedList<?> args) {
routerField = (String) args.get("routerField");
Expand All @@ -189,11 +204,40 @@ public void init(NamedList<?> args) {
@Override
public QParser createParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req) {
final JoinQParserPlugin plugin = this;

final BiFunction<Query, String, Query> wrapperFactory = eventualCacheFactory;
return new QParser(qstr, localParams, params, req) {

@Override
public Query parse() throws SyntaxError {
final Query query = parseImpl();
// make cross core joins time-agnostic
// it should be ruled by param probably
boolean crossCoreCache = false;
// TODO make it {!cache=eventually}
if(localParams.getBool("cacheEventually", false)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introducing {!join cacheEventually=true ...}...

if (query instanceof JoinQuery) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two avoid those ugly instanceofs I'd like to unify existing JoinQP routines to unify and extract common ancestor.

if (((JoinQuery) query).fromCoreOpenTime != 0L) {
((JoinQuery) query).fromCoreOpenTime = Long.MIN_VALUE;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need these queries match to each other for cache hit regardless of commit in fromCore. It can be done by setting some additional flag and changing equals&hashCode. WDYT?

crossCoreCache = true;
}
} else {
if (query instanceof ScoreJoinQParserPlugin.OtherCoreJoinQuery){
if (((ScoreJoinQParserPlugin.OtherCoreJoinQuery) query).fromCoreOpenTime!=0) {
((ScoreJoinQParserPlugin.OtherCoreJoinQuery) query).fromCoreOpenTime = Long.MIN_VALUE;
crossCoreCache = true;
}
}
}
}
if (crossCoreCache) {
String fromIndex = localParams.get("fromIndex");// TODO in might be a single sharded collection
// TODO also , from index is set into joinquery itself
return wrapperFactory.apply(query, fromIndex);
}
return query;
}

private Query parseImpl() throws SyntaxError {
if (localParams != null && localParams.get(METHOD) != null) {
// TODO Make sure 'method' is valid value here and give users a nice error
final Method explicitMethod = Method.valueOf(localParams.get(METHOD));
Expand All @@ -210,6 +254,24 @@ public Query parse() throws SyntaxError {
};
}

public static class DocsetTimestamp {
private DocSet docSet;
private long timestamp;

public DocsetTimestamp(DocSet docSet, long timestamp) {
this.docSet = docSet;
this.timestamp = timestamp;
}

public DocSet getDocSet() {
return docSet;
}

public long getTimestamp() {
return timestamp;
}
}

private static final EnumSet<Method> JOIN_METHOD_ALLOWLIST = EnumSet.of(Method.index, Method.topLevelDV, Method.dvWithScore);
/**
* A helper method for other plugins to create (non-scoring) JoinQueries wrapped around arbitrary queries against the same core.
Expand Down Expand Up @@ -245,5 +307,61 @@ private static Method parseMethodString(String method) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Provided join method '" + method + "' not supported");
}
}


public static class EventualJoinCacheWrapper extends ExtendedQueryBase {
private final Query query;
private final String fromIndex;

public EventualJoinCacheWrapper(Query query, String fromIndex) {
this.query = query;
this.fromIndex = fromIndex;
setCache(false);
}

@Override
public void visit(QueryVisitor visitor) {
query.visit(visitor);
}

@Override
public boolean equals(Object obj) {
return query.equals(obj);
}

@Override
public int hashCode() {
return query.hashCode();
}

@SuppressWarnings("unchecked")
@Override
public Weight createWeight(IndexSearcher searcher, org.apache.lucene.search.ScoreMode scoreMode, float boost) throws IOException {
// either try to obtain it via SRI and assert
final SolrIndexSearcher solrIndexSearcher = (SolrIndexSearcher) searcher;
@SuppressWarnings("rawtypes")
final SolrCache toCache = solrIndexSearcher.getCache(fromIndex);
WrappedQuery wrap = new WrappedQuery(query);
wrap.setCache(false); //bypassing searcher cache
final DocsetTimestamp entry = (DocsetTimestamp)toCache.computeIfAbsent(wrap, k -> {
// let's snapshot from,to reader
final SolrCore fromCore = solrIndexSearcher.getCore().getCoreContainer().getCore(fromIndex);
try {
final RefCounted<SolrIndexSearcher> fromSearcher = fromCore.getSearcher();
try {
long fromCoreTimestamp = fromSearcher.get().getOpenNanoTime();
return createEntry(solrIndexSearcher, (Query) k, fromCoreTimestamp);
} finally {
fromSearcher.decref();
}
} finally {
fromCore.close();
}
});
return entry.getDocSet().getTopFilter().createWeight(searcher, scoreMode, boost);
}

protected DocsetTimestamp createEntry(SolrIndexSearcher solrIndexSearcher, Query joinQuery, long fromCoreTimestamp) throws IOException {
return new DocsetTimestamp(solrIndexSearcher.getDocSet(joinQuery), fromCoreTimestamp);
}
}
}
4 changes: 2 additions & 2 deletions solr/core/src/java/org/apache/solr/search/JoinQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;

class JoinQuery extends Query {
public class JoinQuery extends Query {
String fromField;
String toField;
String fromIndex; // TODO: name is missleading here compared to JoinQParserPlugin usage - here it must be a core name
public String fromIndex; // TODO: name is missleading here compared to JoinQParserPlugin usage - here it must be a core name
Query q;
long fromCoreOpenTime;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.solr.search;

import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.RefCounted;

import java.io.IOException;
import java.security.Principal;
import java.util.List;
import java.util.Map;

/**
* This update processor is expected to be invoked on "fromIndex" side of join to regenerate cached join.
* It loops through all other cores checking them if they are "toIndex" cores:
* those "toIndex" cores, which have user cache with name of this core ("fromIndex") are subj of regeneration.
*
* */
public class RefreshCrossCoreJoinCacheFactory extends UpdateRequestProcessorFactory {
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
return new UpdateRequestProcessor(next) {
@SuppressWarnings("unchecked")
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
super.processCommit(cmd);
// refresh strictly after RunUpdateProcessor

final CoreContainer coreContainer = req.getCore().getCoreContainer();
final List<String> possibleToSideCores = coreContainer.getLoadedCoreNames();
String fromSideCore = req.getCore().getName();
for (String toSideCoreName: possibleToSideCores){
if (!toSideCoreName.equals(fromSideCore)) {
final SolrCore toSideCore = coreContainer.getCore(toSideCoreName);
final RefCounted<SolrIndexSearcher> toSideSearcher = toSideCore.getSearcher();
try {
@SuppressWarnings("rawtypes")
final SolrCache joinCache = toSideSearcher.get().getCache(fromSideCore);
if (joinCache != null) {
// this is necessary for classic join query, which checks SRI, I don't know why.
SolrQueryRequest leftReq = new LocalSolrQueryRequest(toSideCore,req.getParams()) {
@Override public SolrIndexSearcher getSearcher() { return toSideSearcher.get(); }
@Override public void close() { }
};
SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(leftReq, rsp));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RESOURCE_LEAK: resource of type org.apache.solr.search.RefreshCrossCoreJoinCacheFactory$1$1 acquired by call to RefreshCrossCoreJoinCacheFactory$1$1(...) at line 51 is not released after line 56.
Note: potential exception at line 56

(at-me in a reply with help or ignore)


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

try {
joinCache.warm(toSideSearcher.get(), joinCache);
} finally {
SolrRequestInfo.clearRequestInfo();
}
}
} finally {
toSideSearcher.decref();
toSideCore.close();
}
}
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.solr.search.join;

import org.apache.lucene.search.Query;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.search.*;
import org.apache.solr.util.RefCounted;

import java.io.IOException;
import java.util.List;

/**
* It regenerates user cache of {!join cache=false}.. -&gt; (docset,from_index_timestamp)
* */
public class CrossCoreJoinCacheRegenerator implements CacheRegenerator {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goes through user cache updates doc set against new right side searcher


@Override
@SuppressWarnings("unchecked")
public <K, V> boolean regenerateItem(SolrIndexSearcher newSearcher, SolrCache<K, V> newCache, SolrCache<K, V> oldCache, K oldKey, V oldVal) throws IOException {
if (!((ExtendedQuery) oldKey).getCache()) {
String fromIndex = null;
final Query wrappedQuery = ((WrappedQuery) oldKey).getWrappedQuery();
if (wrappedQuery instanceof JoinQuery) {
fromIndex = ((JoinQuery) wrappedQuery).fromIndex;
} else {
if (wrappedQuery instanceof ScoreJoinQParserPlugin.OtherCoreJoinQuery){
fromIndex = ((ScoreJoinQParserPlugin.OtherCoreJoinQuery) wrappedQuery).fromIndex;
} else {
throw new IllegalArgumentException("Unable to regenerate " + wrappedQuery);
}
}
JoinQParserPlugin.DocsetTimestamp cached = (JoinQParserPlugin.DocsetTimestamp) oldVal;

long fromCoreTimestamp;
final SolrCore fromCore = newSearcher.getCore().getCoreContainer().getCore(fromIndex);
try {
final RefCounted<SolrIndexSearcher> fromSearcher = fromCore.getSearcher();
try {
fromCoreTimestamp = fromSearcher.get().getOpenNanoTime();
} finally {
fromSearcher.decref();
}
} finally {
fromCore.close();
}
// this is non-enforced warming.
// Left side commit occurs, some entries might already be regenerated
//final boolean toSideCommitRegeneration = oldCache != newCache;
final boolean freshEntry = cached.getTimestamp() == fromCoreTimestamp;
final Query oldQuery = (Query) oldKey;
// toSideCommitRegeneration &&
if (freshEntry) {
onBypass(oldQuery);
return true; // query cache warming already warmed this entry
}
onRegenerate(oldQuery);
final DocSet docSet = newSearcher.getDocSet(oldQuery);
newCache.put(oldKey, (V) new JoinQParserPlugin.DocsetTimestamp(docSet, fromCoreTimestamp));
return true;
} else {
throw new IllegalArgumentException(this + " regenerates only cache=false queries, but got " + oldKey);
}
}

protected void onRegenerate(Query oldKey) {
//test injection
}

protected void onBypass(Query oldKey) {
//test injection
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {

public static final String SCORE = "score";

static class OtherCoreJoinQuery extends SameCoreJoinQuery {
private final String fromIndex;
private final long fromCoreOpenTime;
public static class OtherCoreJoinQuery extends SameCoreJoinQuery {
public final String fromIndex;
public long fromCoreOpenTime;

public OtherCoreJoinQuery(Query fromQuery, String fromField,
String fromIndex, long fromCoreOpenTime, ScoreMode scoreMode,
Expand Down
Loading