diff --git a/.editorconfig b/.editorconfig index eb8e60aed..938db17dd 100644 --- a/.editorconfig +++ b/.editorconfig @@ -5,3 +5,6 @@ end_of_line = lf insert_final_newline = true indent_style = space charset = utf-8 + +[*.java] +indent_size = 4 diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/CrawlCommand.java b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/CrawlCommand.java index 5058ae585..278f7fc81 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/CrawlCommand.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/CrawlCommand.java @@ -67,11 +67,12 @@ public void execute(CrawlContext ctx) { .executePipeline(CrawlPipelineFactory.create(ctx)); // If there is a terminal crawl state already set, we use it, else - // we rely on pipeline last task state - if (ctx.getSessionProperties() + // we wait a bit for one, in case it hasn't been synched yet, then, + // we rely on pipeline last task state as fallback. + if (!ConcurrentUtil.waitUntil(() -> ctx.getSessionProperties() .getCrawlState() - .map(state -> !state.isTerminal()) - .orElse(false)) { + .map(CrawlState::isTerminal) + .orElse(false), Duration.ofSeconds(5))) { if (result.getState() == TaskState.COMPLETED) { updateCrawlState(ctx, CrawlState.COMPLETED); LOG.info("Crawler completed execution."); @@ -90,7 +91,8 @@ public void execute(CrawlContext ctx) { throw new CrawlerException("Could not stop progress logger.", e); } ctx.fire(CrawlerEvent.CRAWLER_CRAWL_END); - LOG.info("Node done crawling."); + LOG.info("Node done crawling with state: {}", + ctx.getSessionProperties().getCrawlState().orElse(null)); if (Boolean.getBoolean(SYS_PROP_ENABLE_JMX)) { LOG.info("Unregistering JMX crawler MBeans."); diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/bootstrap/ledger/DocLedgerBootstrapper.java b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/bootstrap/ledger/DocLedgerBootstrapper.java index a3e3db6cd..04eb1c040 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/bootstrap/ledger/DocLedgerBootstrapper.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/bootstrap/ledger/DocLedgerBootstrapper.java @@ -19,7 +19,7 @@ import com.norconex.commons.lang.PercentFormatter; import com.norconex.crawler.core.cmd.crawl.pipeline.bootstrap.CrawlBootstrapper; import com.norconex.crawler.core.session.CrawlContext; -import com.norconex.crawler.core.session.ResumeState; +import com.norconex.crawler.core.session.LaunchMode; import lombok.extern.slf4j.Slf4j; @@ -60,7 +60,7 @@ public void bootstrap(CrawlContext crawlContext) { private static void prepareForCrawl(CrawlContext crawlContext) { var ledger = crawlContext.getDocLedger(); - if (crawlContext.getResumeState() == ResumeState.RESUMED) { + if (crawlContext.getResumeState() == LaunchMode.RESUMED) { if (LOG.isInfoEnabled()) { //TODO use total count to track progress independently var processedCount = ledger.getProcessedCount(); diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlActivityChecker.java b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlActivityChecker.java index 57a7658fd..aa10ee6ed 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlActivityChecker.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlActivityChecker.java @@ -17,6 +17,8 @@ import com.norconex.commons.lang.Sleeper; import com.norconex.commons.lang.time.DurationFormatter; import com.norconex.crawler.core.session.CrawlContext; +import com.norconex.crawler.core.session.CrawlState; +import com.norconex.grid.core.compute.GridTaskBuilder; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -36,6 +38,7 @@ class CrawlActivityChecker { private static boolean isResolving; private static boolean isPresumedActive = true; + private boolean maxDocsReached; synchronized boolean isActive() { if (!isPresumedActive) { @@ -63,13 +66,35 @@ private boolean doIsActive() { return true; } - public boolean isMaxDocsApplicableAndReached() { + boolean isMaxDocsApplicableAndReached() { // If deleting we don't care about checking if max is reached, // we proceed. if (deleting) { return false; } - return ctx.getDocLedger().isMaxDocsProcessedReached(); + + if (maxDocsReached) { + return true; + } + + if (ctx.getDocLedger().isMaxDocsProcessedReached()) { + LOG.info("Pausing crawler. Will resume on next start."); + maxDocsReached = true; + // We update the crawl state to PAUSED so that the crawler + // can be resumed later if needed. + // This is done here so that the crawl state is updated + // before the task is stopped. + ctx.getGrid().getCompute().executeTask(GridTaskBuilder + .create("updateCrawlState") + .singleNode() + .processor(g -> CrawlContext + .get(g) + .getSessionProperties() + .updateCrawlState(CrawlState.PAUSED)) + .build()); + return true; + } + return false; } private boolean isQueueInitializedAndEmpty() { diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlProcessTask.java b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlProcessTask.java index f35f57b51..16c17140c 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlProcessTask.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/cmd/crawl/pipeline/process/CrawlProcessTask.java @@ -57,7 +57,6 @@ public enum ProcessQueueAction { private final ProcessQueueAction queueAction; private boolean stopRequested; - // private boolean stopStateSaved; public CrawlProcessTask(String id, ProcessQueueAction queueAction) { super(id); @@ -109,7 +108,6 @@ public void stop(Grid grid) { grid.getCompute().executeTask(GridTaskBuilder .create("updateCrawlState") .singleNode() - .once() .processor(g -> CrawlContext .get(g) .getSessionProperties() @@ -146,6 +144,7 @@ && processNextInQueue(ctx, activityChecker)) private boolean processNextInQueue( CrawlContext crawlCtx, CrawlActivityChecker activityChecker) { + var docProcessCtx = new ProcessContext().crawlContext(crawlCtx); try { var docContext = crawlCtx diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/doc/CrawlDocLedger.java b/crawler/core/src/main/java/com/norconex/crawler/core/doc/CrawlDocLedger.java index 5053b93fc..dd0467e99 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/doc/CrawlDocLedger.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/doc/CrawlDocLedger.java @@ -19,7 +19,7 @@ import com.norconex.crawler.core.event.CrawlerEvent; import com.norconex.crawler.core.session.CrawlContext; -import com.norconex.crawler.core.session.ResumeState; +import com.norconex.crawler.core.session.LaunchMode; import com.norconex.grid.core.storage.GridMap; import com.norconex.grid.core.storage.GridQueue; import com.norconex.grid.core.util.SerialUtil; @@ -53,7 +53,7 @@ * */ @Slf4j -public class CrawlDocLedger { //implements Closeable { +public class CrawlDocLedger { //TODO rename DocLedger //TODO remove all synchronized keywords? @@ -77,6 +77,7 @@ public class CrawlDocLedger { //implements Closeable { // on the created ledger. Example, the CrawlerSpec with its // DocLegerInitializer. public void init(CrawlContext crawlerContext) { + LOG.info("Initializing document ledger..."); crawlContext = crawlerContext; type = crawlerContext.getDocContextType(); var storage = crawlerContext.getGrid().getStorage(); @@ -99,14 +100,15 @@ public void init(CrawlContext crawlerContext) { long maxDocs = crawlContext.getCrawlConfig().getMaxDocuments(); actualMaxDocs = maxDocs; - if ((crawlContext.getResumeState() == ResumeState.RESUMED) - && (maxDocs > -1)) { + var resumed = crawlContext.getResumeState() == LaunchMode.RESUMED; + if (resumed && maxDocs > -1) { actualMaxDocs += getProcessedCount(); LOG.info(""" An additional maximum of {} processed documents is added to this resumed session, for a maximum total of {}. """, maxDocs, actualMaxDocs); } + LOG.info("Done initializing document ledger."); } /** diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContext.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContext.java index a15184647..6cb327be0 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContext.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContext.java @@ -89,7 +89,7 @@ public class CrawlContext { private final Path tempDir; private final CachedStreamFactory streamFactory; private final Class docContextType; - private final ResumeState resumeState; + private final LaunchMode resumeState; private final CrawlMode crawlMode; private final CrawlSessionProperties sessionProperties; private final ScopedThreadFactoryCreator threadFactoryCreator; @@ -110,7 +110,7 @@ public String toString() { } public boolean isResumedSession() { - return getResumeState() == ResumeState.RESUMED; + return getResumeState() == LaunchMode.RESUMED; } public boolean isIncrementalCrawl() { diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContextFactory.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContextFactory.java index b74b626e9..e2298a1b7 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContextFactory.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlContextFactory.java @@ -62,7 +62,8 @@ static class Builder { private CrawlConfig config; CrawlContextFactory build() { - return new CrawlContextFactory(grid, session, driver, config); + return new CrawlContextFactory(grid, session, driver, + config); } } @@ -88,16 +89,19 @@ CrawlContext create() { .callbacks(driver.callbacks()) .committerService(CommitterService .builder() - .committers(config.getCommitters()) + .committers(config + .getCommitters()) .eventManager(eventManager) - .upsertRequestBuilder(doc -> new UpsertRequest( - doc.getReference(), - doc.getMetadata(), - // InputStream closed by caller - doc.getInputStream())) - .deleteRequestBuilder(doc -> new DeleteRequest( - doc.getReference(), - doc.getMetadata())) + .upsertRequestBuilder( + doc -> new UpsertRequest( + doc.getReference(), + doc.getMetadata(), + // InputStream closed by caller + doc.getInputStream())) + .deleteRequestBuilder( + doc -> new DeleteRequest( + doc.getReference(), + doc.getMetadata())) .build()) .crawlConfig(config) .crawlMode(session.getCrawlMode()) @@ -108,9 +112,11 @@ CrawlContext create() { .eventManager(eventManager) .fetcher(MultiFetcher.builder() .fetchers(config.getFetchers()) - .maxRetries(config.getFetchersMaxRetries()) + .maxRetries(config + .getFetchersMaxRetries()) .responseAggregator( - driver.fetchDriver().responseAggregator()) + driver.fetchDriver() + .responseAggregator()) .unsuccessfulResponseFactory( driver.fetchDriver() .unsuccesfulResponseFactory()) @@ -118,16 +124,20 @@ CrawlContext create() { .grid(grid) .metrics(new CrawlerMetricsImpl()) .importer(new Importer( - config.getImporterConfig(), eventManager)) - .resumeState(session.getResumeState()) + config.getImporterConfig(), + eventManager)) + .resumeState(session.getLaunchMode()) .sessionProperties( - new CrawlSessionProperties(grid, config.getId())) + new CrawlSessionProperties(grid, + config.getId())) .streamFactory(new CachedStreamFactory( (int) config.getMaxStreamCachePoolSize(), (int) config.getMaxStreamCacheSize(), tempDir)) .tempDir(tempDir) - .threadFactoryCreator(new ScopedThreadFactoryCreator("crawl")) + .threadFactoryCreator( + new ScopedThreadFactoryCreator( + "crawl")) .workDir(workDir) .build(); @@ -137,11 +147,13 @@ CrawlContext create() { } private void init(CrawlContext ctx) { - ctx.getEventManager().addListenersFromScan(ctx.getCrawlConfig()); + ctx.getEventManager() + .addListenersFromScan(ctx.getCrawlConfig()); ctx.getCommitterService().init(CommitterContext .builder() .setEventManager(ctx.getEventManager()) - .setWorkDir(ctx.getWorkDir().resolve("committer")) + .setWorkDir(ctx.getWorkDir() + .resolve("committer")) .setStreamFactory(ctx.getStreamFactory()) .build()); ctx.getDocLedger().init(ctx); @@ -155,7 +167,9 @@ private void createDir(Path dir) { try { Files.createDirectories(dir); } catch (IOException e) { - throw new CrawlerException("Could not create directory: " + dir, e); + throw new CrawlerException( + "Could not create directory: " + dir, + e); } } } diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSession.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSession.java index 89cacd366..0cc7c617a 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSession.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSession.java @@ -32,7 +32,7 @@ public class CrawlSession implements Serializable { private String crawlerId; private CrawlState crawlState; private CrawlMode crawlMode; - private ResumeState resumeState; + private LaunchMode launchMode; private long lastUpdated; } diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionProperties.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionProperties.java index 8843f2347..3bd8f6390 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionProperties.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionProperties.java @@ -21,11 +21,14 @@ import com.norconex.grid.core.Grid; import com.norconex.grid.core.storage.GridMap; +import lombok.extern.slf4j.Slf4j; + /** * Facade to a grid store for persisting or getting session state * values (e.g., flags) or other session-specific information, shared across * nodes (when on a grid). */ +@Slf4j public class CrawlSessionProperties { private static final String QUEUE_INITIALIZED = "queueInitialized"; @@ -55,6 +58,7 @@ public Optional getCrawlState() { } public void updateCrawlState(CrawlState state) { + LOG.info("Updating crawl state for crawler {} to {}.", id, state); var store = CrawlSessionManager.sessionStore(grid); var session = store.get(id); if (session == null) { diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionResolver.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionResolver.java index 11452a366..a0839d9c5 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionResolver.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/CrawlSessionResolver.java @@ -54,29 +54,30 @@ private static CrawlSession doResolve( session = new CrawlSession() .setCrawlerId(id) .setCrawlMode(CrawlMode.FULL) - .setResumeState(ResumeState.INITIAL); + .setLaunchMode(LaunchMode.NEW); } else if (session.getCrawlState() == CrawlState.RUNNING) { if (System.currentTimeMillis() - session.getLastUpdated() > sessionTimeout.toMillis()) { LOG.warn("A crawl session for crawler {} was " + "detected but expired. Trying to resume it.", id); - session.setResumeState(ResumeState.RESUMED); + session.setLaunchMode(LaunchMode.RESUMED); } else { LOG.info("Joining crawl session for crawler {}.", id); } } else if (session.getCrawlState() == CrawlState.PAUSED) { - LOG.info("A previously paused crawl session was detected for. " + LOG.info("A previously paused crawl session was detected for " + "crawler {}. Resuming it.", id); - session.setResumeState(ResumeState.RESUMED); + session.setLaunchMode(LaunchMode.RESUMED); } else if (session.getCrawlState() == CrawlState.COMPLETED) { - LOG.info("A previously completed crawl session was detected for. " + LOG.info("A previously completed crawl session was detected for " + "crawler {}. Starting a new incremental crawl session.", id); session.setCrawlMode(CrawlMode.INCREMENTAL); + session.setLaunchMode(LaunchMode.NEW); } else if (session.getCrawlState() == CrawlState.FAILED) { LOG.warn("A crawl session for crawler {} was detected but is " + "marked as failed. Trying to resume it.", id); - session.setResumeState(ResumeState.RESUMED); + session.setLaunchMode(LaunchMode.RESUMED); } //MAYBE: if we can detect here that the state is not valid for joining diff --git a/crawler/core/src/main/java/com/norconex/crawler/core/session/ResumeState.java b/crawler/core/src/main/java/com/norconex/crawler/core/session/LaunchMode.java similarity index 61% rename from crawler/core/src/main/java/com/norconex/crawler/core/session/ResumeState.java rename to crawler/core/src/main/java/com/norconex/crawler/core/session/LaunchMode.java index 0981ade2b..48fd31d6b 100644 --- a/crawler/core/src/main/java/com/norconex/crawler/core/session/ResumeState.java +++ b/crawler/core/src/main/java/com/norconex/crawler/core/session/LaunchMode.java @@ -14,7 +14,19 @@ */ package com.norconex.crawler.core.session; -public enum ResumeState { - INITIAL, +/** + * Describes how a job was launched (new launch or resuming a non + * completed/failed one). + */ +public enum LaunchMode { + /** + * The crawler started from the beginning (first-time or restarted after + * success, regardless of crawl mode -- incremental or full). + */ + NEW, + /** + * The crawler resumed from a prior incomplete execution (e.g., paused + * or failed). + */ RESUMED } diff --git a/crawler/core/src/test/java/com/norconex/crawler/core/session/TestSessionUtil.java b/crawler/core/src/test/java/com/norconex/crawler/core/session/TestSessionUtil.java index ce689c1c2..85cfab466 100644 --- a/crawler/core/src/test/java/com/norconex/crawler/core/session/TestSessionUtil.java +++ b/crawler/core/src/test/java/com/norconex/crawler/core/session/TestSessionUtil.java @@ -39,7 +39,7 @@ public static CrawlContext createCrawlerContext( .setCrawlMode(CrawlMode.FULL) .setCrawlState(CrawlState.RUNNING) .setLastUpdated(System.currentTimeMillis()) - .setResumeState(ResumeState.INITIAL)) + .setLaunchMode(LaunchMode.NEW)) .build() .create(); } diff --git a/crawler/web/src/test/java/com/norconex/crawler/web/WebTestUtil.java b/crawler/web/src/test/java/com/norconex/crawler/web/WebTestUtil.java index 5f9c11edd..9b9e1a672 100644 --- a/crawler/web/src/test/java/com/norconex/crawler/web/WebTestUtil.java +++ b/crawler/web/src/test/java/com/norconex/crawler/web/WebTestUtil.java @@ -148,8 +148,7 @@ public static MemoryCommitter firstCommitter(@NonNull Crawler crawler) { * @param config crawler config * @return Memory committer */ - public static MemoryCommitter memoryCommitter( - @NonNull CrawlConfig config) { + public static MemoryCommitter memoryCommitter(@NonNull CrawlConfig config) { return (MemoryCommitter) config .getCommitters() .stream() @@ -164,8 +163,7 @@ public static MemoryCommitter memoryCommitter( * @param config crawler config * @return Test committer */ - public static TestCommitter - getTestCommitter(@NonNull CrawlConfig config) { + public static TestCommitter getTestCommitter(@NonNull CrawlConfig config) { return (TestCommitter) config .getCommitters() .stream() @@ -183,7 +181,8 @@ public static MemoryCommitter memoryCommitter( @SneakyThrows public static void addTestCommitterOnce(@NonNull CrawlConfig cfg) { if (cfg.getCommitters().isEmpty() || cfg.getCommitters() - .stream().noneMatch(TestCommitter.class::isInstance)) { + .stream() + .noneMatch(TestCommitter.class::isInstance)) { var committer = new TestCommitter( cfg.getWorkDir().resolve(TEST_COMMITER_DIR)); committer.init(null); @@ -193,8 +192,7 @@ public static void addTestCommitterOnce(@NonNull CrawlConfig cfg) { } } - public static HttpClientFetcher firstHttpFetcher( - @NonNull Crawler crawler) { + public static HttpClientFetcher firstHttpFetcher(@NonNull Crawler crawler) { return (HttpClientFetcher) crawler .getCrawlConfig() .getFetchers() @@ -249,7 +247,8 @@ public static String lastSortedDeleteReference(MemoryCommitter c) { } public static void ignoreAllIgnorables(Crawler crawler) { - ignoreAllIgnorables((WebCrawlerConfig) crawler.getCrawlConfig()); + ignoreAllIgnorables( + (WebCrawlerConfig) crawler.getCrawlConfig()); } public static void ignoreAllIgnorables(WebCrawlerConfig config) { @@ -294,7 +293,8 @@ public static String toString(InputStream is) { } public static String resourceAsString(String resourcePath) { - return toString(WebTestUtil.class.getResourceAsStream(resourcePath)); + return toString(WebTestUtil.class + .getResourceAsStream(resourcePath)); } @SafeVarargs @@ -309,7 +309,8 @@ private static Randomizer randomInstanceOf( return () -> { if (subtypes.length == 0) return null; - var index = ThreadLocalRandom.current().nextInt(subtypes.length); + var index = ThreadLocalRandom.current() + .nextInt(subtypes.length); return easyRandom.nextObject(subtypes[index]); }; } @@ -331,12 +332,12 @@ private static EasyRandom createRandomizer() { .overrideDefaultInitialization(true) .randomize( File.class, - () -> new File( - new StringRandomizer(100).getRandomValue())) + () -> new File(new StringRandomizer( + 100).getRandomValue())) .randomize( Path.class, - () -> Path.of( - new StringRandomizer(100).getRandomValue())) + () -> Path.of(new StringRandomizer( + 100).getRandomValue())) .randomize( Long.class, () -> Math.abs(new LongRandomizer().getRandomValue())) @@ -344,7 +345,8 @@ private static EasyRandom createRandomizer() { Integer.class, () -> Math.abs( new IntegerRandomizer().getRandomValue())) - .randomize(ImporterConfig.class, ImporterConfig::new) + .randomize(ImporterConfig.class, + ImporterConfig::new) .randomize( UpsertRequest.class, () -> new UpsertRequest( @@ -356,12 +358,14 @@ private static EasyRandom createRandomizer() { () -> new DeleteRequest( new StringRandomizer(100).getRandomValue(), new Properties())) - .randomize(Committer.class, MemoryCommitter::new) + .randomize(Committer.class, + MemoryCommitter::new) .randomize( SpoiledReferenceStrategizer.class, GenericSpoiledReferenceStrategizer::new) .randomize( - AtomicBoolean.class, () -> new AtomicBoolean( + AtomicBoolean.class, + () -> new AtomicBoolean( new BooleanRandomizer().getRandomValue())) .randomize( UrlScopeResolver.class, @@ -424,19 +428,19 @@ private static EasyRandom createRandomizer() { int a = new NumberRandomizer().getRandomValue(); int b = new NumberRandomizer().getRandomValue(); return CircularRange.between( - Math.min(a, b), - Math.max(a, b)); + Math.min(a, b), Math.max(a, b)); }) .randomize( CachedInputStream.class, CachedInputStream::nullInputStream) .randomize(Fetcher.class, HttpClientFetcher::new) .randomize( - RobotsTxtProvider.class, - StandardRobotsTxtProvider::new) + RobotsTxtProvider.class, StandardRobotsTxtProvider::new) .randomize( - Pattern.class, () -> Pattern.compile( - new StringRandomizer(20).getRandomValue())) + Pattern.class, + () -> Pattern.compile( + new StringRandomizer( + 20).getRandomValue())) .randomize(DelayResolver.class, () -> { var resolv = new GenericDelayResolver(); resolv.getConfiguration() @@ -454,14 +458,13 @@ private static EasyRandom createRandomizer() { return extractor; }) .randomize( - f -> "cookieSpec".equals( - f.getName()), + f -> "cookieSpec".equals(f.getName()), () -> CookieSpec.STRICT) .randomize( named(HttpAuthConfig.Fields.method) .and(ofType(HttpAuthMethod.class)) .and(inClass(HttpAuthConfig.class)), - randomOneOf(HttpAuthMethod.values()))); + randomOneOf(HttpAuthMethod + .values()))); } - } diff --git a/crawler/web/src/test/java/com/norconex/crawler/web/doc/pipelines/queue/stages/SitemapResolutionStageTest.java b/crawler/web/src/test/java/com/norconex/crawler/web/doc/pipelines/queue/stages/SitemapResolutionStageTest.java index 6d6f82265..f13657a63 100644 --- a/crawler/web/src/test/java/com/norconex/crawler/web/doc/pipelines/queue/stages/SitemapResolutionStageTest.java +++ b/crawler/web/src/test/java/com/norconex/crawler/web/doc/pipelines/queue/stages/SitemapResolutionStageTest.java @@ -72,8 +72,7 @@ void testStayOnSitemap(ClientAndServer client, WebCrawlerConfig cfg) { baseUrl + "0001", baseUrl + "0002", baseUrl + "0003"); - client - .when(request().withPath("/sitemap.xml")) + client.when(request().withPath("/sitemap.xml")) .respond(response().withBody(sitemap, MediaType.XML_UTF_8)); MockWebsite.whenInfiniteDepth(client); @@ -90,7 +89,8 @@ void testStayOnSitemap(ClientAndServer client, WebCrawlerConfig cfg) { assertThat(mem.getRequestCount()).isEqualTo(3); assertThat(mem.getUpsertRequests()) - .map(req -> substringAfterLast(req.getReference(), "/")) + .map(req -> substringAfterLast( + req.getReference(), "/")) .containsExactly("0001", "0002", "0003"); } @@ -106,7 +106,8 @@ void testStayOnSitemapStartInSitemap( baseUrl + "0003"); client .when(request().withPath("/sitemap.xml")) - .respond(response().withBody(sitemap, MediaType.XML_UTF_8)); + .respond(response().withBody(sitemap, + MediaType.XML_UTF_8)); MockWebsite.whenInfiniteDepth(client); cfg.setSitemapLocator(new GenericSitemapLocator()) @@ -117,11 +118,13 @@ void testStayOnSitemapStartInSitemap( ((GenericUrlScopeResolver) cfg.getUrlScopeResolver()) .getConfiguration().setStayOnSitemap(true); - var mem = WebCrawlTestCapturer.crawlAndCapture(cfg).getCommitter(); + var mem = WebCrawlTestCapturer.crawlAndCapture(cfg) + .getCommitter(); assertThat(mem.getRequestCount()).isEqualTo(3); assertThat(mem.getUpsertRequests()) - .map(req -> substringAfterLast(req.getReference(), "/")) + .map(req -> substringAfterLast( + req.getReference(), "/")) .containsExactly("0001", "0002", "0003"); } } diff --git a/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskCoordinator.java b/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskCoordinator.java index 00942f818..da7e19e7b 100644 --- a/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskCoordinator.java +++ b/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskCoordinator.java @@ -79,7 +79,7 @@ public TaskExecutionResult executeTask(GridTask task) throws Exception { // because workers tasks are invoked directly on workers by the // coordinator. if (!grid.isCoordinator()) { - TaskUtil.logNonCoordinatorCantExecute(grid, task); + TaskUtil.logNonCoordinatorCantExecute(task); //TODO verify if this will get called even if joining, // late, after the task is executed... will it receive remote task // request in parallel? @@ -143,7 +143,7 @@ private TaskExecutionResult awaitCoordinatorDoneSignal(GridTask task) { if (localWorker.isNodeTaskStopRequested(task.getId())) { LOG.info(""" Node {} received a request to stop the \ - task {} while waiting for coordinator signal. \ + task {} while waiting for a coordinator signal. \ Stopping (no longer waiting).""", grid.getNodeAddress(), task.getId()); taskStatusRef.set(new TaskExecutionResult( diff --git a/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskUtil.java b/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskUtil.java index 118306d3c..3b6b5266b 100644 --- a/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskUtil.java +++ b/grid/core/src/main/java/com/norconex/grid/core/impl/compute/task/TaskUtil.java @@ -24,13 +24,11 @@ import com.norconex.grid.core.compute.GridTask; import com.norconex.grid.core.compute.TaskExecutionResult; import com.norconex.grid.core.compute.TaskState; -import com.norconex.grid.core.impl.CoreGrid; import lombok.extern.slf4j.Slf4j; @Slf4j public final class TaskUtil { - private TaskUtil() { } @@ -43,7 +41,8 @@ public static boolean isValidNodeResponse(Rsp rsp) { } public static boolean hasNodeTaskExpired( - long taskStartTime, TaskProgress progress, Duration nodeTimeout) { + long taskStartTime, TaskProgress progress, + Duration nodeTimeout) { var lastHeartbeat = progress != null ? progress.getLastHeartbeat() : taskStartTime; @@ -59,17 +58,20 @@ public static boolean isState(TaskProgress progress, TaskState state) { .isPresent(); } - public static void logNonCoordinatorCantExecute(CoreGrid grid, - GridTask task) { + public static void logNonCoordinatorCantExecute(GridTask task) { if (!LOG.isInfoEnabled()) { return; } - var msgSuffix = task.getExecutionMode() == ExecutionMode.SINGLE_NODE - ? "will wait for the coordinator to be done with this " - + "single-node task." - : "will participate when asked by the coordinator."; - LOG.info("Non-coordinator node {}. Letting the coordinator node " - + "start task \"{}\" and this node {}.", - grid.getNodeAddress(), task.getId(), msgSuffix); + if (task.getExecutionMode() == ExecutionMode.SINGLE_NODE) { + LOG.info(""" + Ignoring single-node task "{}". Letting the coordinator node + handle it and waiting until it's done... + """, task.getId()); + } else { + LOG.info(""" + Received multi-nodes task "{}". Will participate if/when asked by + the coordinator (else, wait until it's done)... + """, task.getId()); + } } }