diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java index 5d508cd2e4f..fe22b4ad594 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java @@ -194,6 +194,14 @@ public ShortestPathStream(StreamExpression expression, StreamFactory factory) th Integer.parseInt(((StreamExpressionValue) depthExpression.getParameter()).getValue()); } + boolean legacyJoin = false; + + StreamExpressionNamedParameter legacyJoinExpression = factory.getNamedOperand(expression, "legacyJoin"); + + if(legacyJoinExpression != null) { + legacyJoin = Boolean.valueOf(((StreamExpressionValue)legacyJoinExpression.getParameter()).getValue()); + } + ModifiableSolrParams params = new ModifiableSolrParams(); for (StreamExpressionNamedParameter namedParam : namedParams) { if (!namedParam.getName().equals("zkHost") @@ -262,6 +270,7 @@ private void init( this.joinBatchSize = joinBatchSize; this.threads = threads; this.maxDepth = maxDepth; + this.legacyJoin = legacyJoin; } @Override diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java index e16c2692fc0..ec1603d0e53 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/graph/GraphTest.java @@ -69,6 +69,18 @@ public void cleanIndex() throws Exception { @Test public void testShortestPathStream() throws Exception { + implTestShortestPathStream(false); + } + + @Test + public void testLegacyShortestPathStream() throws Exception { + implTestShortestPathStream(true); + } + + private void implTestShortestPathStream(final boolean legacyJoin) throws Exception { + + final String ORIGINAL_TITLE = random().nextBoolean() ? "Original 'P' paperback" : "Original \"H\" hardback"; + final String TRANSLATED_TITLE = "Translation"; new UpdateRequest() .add(id, "0", "from_s", "jim", "to_s", "mike", "predicate_s", "knows") @@ -86,6 +98,16 @@ public void testShortestPathStream() throws Exception { .add(id, "11", "from_s", "mary", "to_s", "max", "predicate_s", "knows") .add(id, "12", "from_s", "mary", "to_s", "jim", "predicate_s", "knows") .add(id, "13", "from_s", "mary", "to_s", "steve", "predicate_s", "knows") + // SOLR-15546: fromNode and toNode contains colon + .add(id, "14", "from_s", "https://aaa", "to_s", "https://bbb", "predicate_s", "links") + .add(id, "15", "from_s", "https://bbb", "to_s", "https://ccc", "predicate_s", "links") + // SOLR-15546: fromNode and toNode contains space + .add(id, "16", "from_s", "Author", "to_s", TRANSLATED_TITLE, "predicate_s", "author_to_book") + .add(id, "17", "from_s", TRANSLATED_TITLE, "to_s", "Translator", "predicate_s", "book_to_translator") + .add(id, "18", "from_s", "Ann Author", "to_s", TRANSLATED_TITLE, "predicate_s", "author_to_book") + .add(id, "19", "from_s", TRANSLATED_TITLE, "to_s", "Tim Translator", "predicate_s", "book_to_translator") + .add(id, "22", "from_s", "Ann Author", "to_s", ORIGINAL_TITLE, "predicate_s", "author_to_book") + .add(id, "23", "from_s", ORIGINAL_TITLE, "to_s", "Tim Translator", "predicate_s", "book_to_translator") .commit(cluster.getSolrClient(), COLLECTION); List tuples = null; @@ -180,6 +202,91 @@ public void testShortestPathStream() throws Exception { assertTrue(paths.contains("[jim, stan, mary, steve]")); + if (!legacyJoin) { + // SOLR-15546: fromNode and toNode contains colon + stream = new ShortestPathStream(zkHost, + "collection1", + "https://aaa", + "https://bbb", + "from_s", + "to_s", + StreamingTest.mapParams("fq", "predicate_s:links"), + 10, + 3, + 6); + + stream.setStreamContext(context); + paths = new HashSet<>(); + tuples = getTuples(stream); + assertTrue(tuples.size() == 1); + + for(Tuple tuple : tuples) { + paths.add(tuple.getStrings("path").toString()); + } + + assertTrue(paths.contains("[https://aaa, https://bbb]")); + + // SOLR-15546: fromNode and toNode and interim node contains colon + stream = new ShortestPathStream(zkHost, + "collection1", + "https://aaa", + "https://ccc", + "from_s", + "to_s", + StreamingTest.mapParams("fq", "predicate_s:links"), + 10, + 3, + 6); + + stream.setStreamContext(context); + paths = new HashSet<>(); + tuples = getTuples(stream); + assertTrue(tuples.size() == 1); + + for(Tuple tuple : tuples) { + paths.add(tuple.getStrings("path").toString()); + } + + assertTrue(paths.contains("[https://aaa, https://bbb, https://ccc]")); + } + + // SOLR-15546: fromNode and toNode contains colon + stream = new ShortestPathStream(zkHost, + "collection1", + "Ann Author", + "Tim Translator", + "from_s", + "to_s", + StreamingTest.mapParams(), + 10, + 3, + 6, + legacyJoin); + + stream.setStreamContext(context); + paths = new HashSet<>(); + tuples = getTuples(stream); + + for(Tuple tuple : tuples) { + paths.add(tuple.getStrings("path").toString()); + } + + if (!legacyJoin) { + if (ORIGINAL_TITLE.contains("\"")) { + assertEquals(1, tuples.size()); + // double quotes in the interim ORIGINAL_TITLE node were not matched + assertTrue(paths.contains("[Ann Author, "+TRANSLATED_TITLE+", Tim Translator]")); + } else { + assertEquals(2, tuples.size()); + assertTrue(paths.contains("[Ann Author, "+ORIGINAL_TITLE+", Tim Translator]")); + assertTrue(paths.contains("[Ann Author, "+TRANSLATED_TITLE+", Tim Translator]")); + } + } else { + assertEquals(1, tuples.size()); + // "Ann Author" fromNode got interpreted as "Ann" OR "Author" + assertTrue(paths.contains("[Author, "+TRANSLATED_TITLE+", Tim Translator]")); + } + cache.close(); }