Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ end_of_line = lf
insert_final_newline = true
indent_style = space
charset = utf-8

[*.java]
indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ public void execute(CrawlContext ctx) {
.executePipeline(CrawlPipelineFactory.create(ctx));

// If there is a terminal crawl state already set, we use it, else
// we rely on pipeline last task state
if (ctx.getSessionProperties()
// we wait a bit for one, in case it hasn't been synched yet, then,
// we rely on pipeline last task state as fallback.
if (!ConcurrentUtil.waitUntil(() -> ctx.getSessionProperties()
.getCrawlState()
.map(state -> !state.isTerminal())
.orElse(false)) {
.map(CrawlState::isTerminal)
.orElse(false), Duration.ofSeconds(5))) {
if (result.getState() == TaskState.COMPLETED) {
updateCrawlState(ctx, CrawlState.COMPLETED);
LOG.info("Crawler completed execution.");
Expand All @@ -90,7 +91,8 @@ public void execute(CrawlContext ctx) {
throw new CrawlerException("Could not stop progress logger.", e);
}
ctx.fire(CrawlerEvent.CRAWLER_CRAWL_END);
LOG.info("Node done crawling.");
LOG.info("Node done crawling with state: {}",
ctx.getSessionProperties().getCrawlState().orElse(null));

if (Boolean.getBoolean(SYS_PROP_ENABLE_JMX)) {
LOG.info("Unregistering JMX crawler MBeans.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.norconex.commons.lang.PercentFormatter;
import com.norconex.crawler.core.cmd.crawl.pipeline.bootstrap.CrawlBootstrapper;
import com.norconex.crawler.core.session.CrawlContext;
import com.norconex.crawler.core.session.ResumeState;
import com.norconex.crawler.core.session.LaunchMode;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -60,7 +60,7 @@ public void bootstrap(CrawlContext crawlContext) {

private static void prepareForCrawl(CrawlContext crawlContext) {
var ledger = crawlContext.getDocLedger();
if (crawlContext.getResumeState() == ResumeState.RESUMED) {
if (crawlContext.getResumeState() == LaunchMode.RESUMED) {
if (LOG.isInfoEnabled()) {
//TODO use total count to track progress independently
var processedCount = ledger.getProcessedCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.norconex.commons.lang.Sleeper;
import com.norconex.commons.lang.time.DurationFormatter;
import com.norconex.crawler.core.session.CrawlContext;
import com.norconex.crawler.core.session.CrawlState;
import com.norconex.grid.core.compute.GridTaskBuilder;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -36,6 +38,7 @@ class CrawlActivityChecker {

private static boolean isResolving;
private static boolean isPresumedActive = true;
private boolean maxDocsReached;

synchronized boolean isActive() {
if (!isPresumedActive) {
Expand Down Expand Up @@ -63,13 +66,35 @@ private boolean doIsActive() {
return true;
}

public boolean isMaxDocsApplicableAndReached() {
boolean isMaxDocsApplicableAndReached() {
// If deleting we don't care about checking if max is reached,
// we proceed.
if (deleting) {
return false;
}
return ctx.getDocLedger().isMaxDocsProcessedReached();

if (maxDocsReached) {
return true;
}

if (ctx.getDocLedger().isMaxDocsProcessedReached()) {
LOG.info("Pausing crawler. Will resume on next start.");
maxDocsReached = true;
// We update the crawl state to PAUSED so that the crawler
// can be resumed later if needed.
// This is done here so that the crawl state is updated
// before the task is stopped.
ctx.getGrid().getCompute().executeTask(GridTaskBuilder
.create("updateCrawlState")
.singleNode()
.processor(g -> CrawlContext
.get(g)
.getSessionProperties()
.updateCrawlState(CrawlState.PAUSED))
.build());
return true;
}
return false;
}

private boolean isQueueInitializedAndEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public enum ProcessQueueAction {

private final ProcessQueueAction queueAction;
private boolean stopRequested;
// private boolean stopStateSaved;

public CrawlProcessTask(String id, ProcessQueueAction queueAction) {
super(id);
Expand Down Expand Up @@ -109,7 +108,6 @@ public void stop(Grid grid) {
grid.getCompute().executeTask(GridTaskBuilder
.create("updateCrawlState")
.singleNode()
.once()
.processor(g -> CrawlContext
.get(g)
.getSessionProperties()
Expand Down Expand Up @@ -146,6 +144,7 @@ && processNextInQueue(ctx, activityChecker))
private boolean processNextInQueue(
CrawlContext crawlCtx,
CrawlActivityChecker activityChecker) {

var docProcessCtx = new ProcessContext().crawlContext(crawlCtx);
try {
var docContext = crawlCtx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.norconex.crawler.core.event.CrawlerEvent;
import com.norconex.crawler.core.session.CrawlContext;
import com.norconex.crawler.core.session.ResumeState;
import com.norconex.crawler.core.session.LaunchMode;
import com.norconex.grid.core.storage.GridMap;
import com.norconex.grid.core.storage.GridQueue;
import com.norconex.grid.core.util.SerialUtil;
Expand Down Expand Up @@ -53,7 +53,7 @@
* </ul>
*/
@Slf4j
public class CrawlDocLedger { //implements Closeable {
public class CrawlDocLedger {

//TODO rename DocLedger
//TODO remove all synchronized keywords?
Expand All @@ -77,6 +77,7 @@ public class CrawlDocLedger { //implements Closeable {
// on the created ledger. Example, the CrawlerSpec with its
// DocLegerInitializer.
public void init(CrawlContext crawlerContext) {
LOG.info("Initializing document ledger...");
crawlContext = crawlerContext;
type = crawlerContext.getDocContextType();
var storage = crawlerContext.getGrid().getStorage();
Expand All @@ -99,14 +100,15 @@ public void init(CrawlContext crawlerContext) {

long maxDocs = crawlContext.getCrawlConfig().getMaxDocuments();
actualMaxDocs = maxDocs;
if ((crawlContext.getResumeState() == ResumeState.RESUMED)
&& (maxDocs > -1)) {
var resumed = crawlContext.getResumeState() == LaunchMode.RESUMED;
if (resumed && maxDocs > -1) {
actualMaxDocs += getProcessedCount();
LOG.info("""
An additional maximum of {} processed documents is
added to this resumed session, for a maximum total of {}.
""", maxDocs, actualMaxDocs);
}
LOG.info("Done initializing document ledger.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class CrawlContext {
private final Path tempDir;
private final CachedStreamFactory streamFactory;
private final Class<? extends CrawlDocContext> docContextType;
private final ResumeState resumeState;
private final LaunchMode resumeState;
private final CrawlMode crawlMode;
private final CrawlSessionProperties sessionProperties;
private final ScopedThreadFactoryCreator threadFactoryCreator;
Expand All @@ -110,7 +110,7 @@ public String toString() {
}

public boolean isResumedSession() {
return getResumeState() == ResumeState.RESUMED;
return getResumeState() == LaunchMode.RESUMED;
}

public boolean isIncrementalCrawl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ static class Builder {
private CrawlConfig config;

CrawlContextFactory build() {
return new CrawlContextFactory(grid, session, driver, config);
return new CrawlContextFactory(grid, session, driver,
config);
}
}

Expand All @@ -88,16 +89,19 @@ CrawlContext create() {
.callbacks(driver.callbacks())
.committerService(CommitterService
.<CrawlDoc>builder()
.committers(config.getCommitters())
.committers(config
.getCommitters())
.eventManager(eventManager)
.upsertRequestBuilder(doc -> new UpsertRequest(
doc.getReference(),
doc.getMetadata(),
// InputStream closed by caller
doc.getInputStream()))
.deleteRequestBuilder(doc -> new DeleteRequest(
doc.getReference(),
doc.getMetadata()))
.upsertRequestBuilder(
doc -> new UpsertRequest(
doc.getReference(),
doc.getMetadata(),
// InputStream closed by caller
doc.getInputStream()))
.deleteRequestBuilder(
doc -> new DeleteRequest(
doc.getReference(),
doc.getMetadata()))
.build())
.crawlConfig(config)
.crawlMode(session.getCrawlMode())
Expand All @@ -108,26 +112,32 @@ CrawlContext create() {
.eventManager(eventManager)
.fetcher(MultiFetcher.builder()
.fetchers(config.getFetchers())
.maxRetries(config.getFetchersMaxRetries())
.maxRetries(config
.getFetchersMaxRetries())
.responseAggregator(
driver.fetchDriver().responseAggregator())
driver.fetchDriver()
.responseAggregator())
.unsuccessfulResponseFactory(
driver.fetchDriver()
.unsuccesfulResponseFactory())
.build())
.grid(grid)
.metrics(new CrawlerMetricsImpl())
.importer(new Importer(
config.getImporterConfig(), eventManager))
.resumeState(session.getResumeState())
config.getImporterConfig(),
eventManager))
.resumeState(session.getLaunchMode())
.sessionProperties(
new CrawlSessionProperties(grid, config.getId()))
new CrawlSessionProperties(grid,
config.getId()))
.streamFactory(new CachedStreamFactory(
(int) config.getMaxStreamCachePoolSize(),
(int) config.getMaxStreamCacheSize(),
tempDir))
.tempDir(tempDir)
.threadFactoryCreator(new ScopedThreadFactoryCreator("crawl"))
.threadFactoryCreator(
new ScopedThreadFactoryCreator(
"crawl"))
.workDir(workDir)
.build();

Expand All @@ -137,11 +147,13 @@ CrawlContext create() {
}

private void init(CrawlContext ctx) {
ctx.getEventManager().addListenersFromScan(ctx.getCrawlConfig());
ctx.getEventManager()
.addListenersFromScan(ctx.getCrawlConfig());
ctx.getCommitterService().init(CommitterContext
.builder()
.setEventManager(ctx.getEventManager())
.setWorkDir(ctx.getWorkDir().resolve("committer"))
.setWorkDir(ctx.getWorkDir()
.resolve("committer"))
.setStreamFactory(ctx.getStreamFactory())
.build());
ctx.getDocLedger().init(ctx);
Expand All @@ -155,7 +167,9 @@ private void createDir(Path dir) {
try {
Files.createDirectories(dir);
} catch (IOException e) {
throw new CrawlerException("Could not create directory: " + dir, e);
throw new CrawlerException(
"Could not create directory: " + dir,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class CrawlSession implements Serializable {
private String crawlerId;
private CrawlState crawlState;
private CrawlMode crawlMode;
private ResumeState resumeState;
private LaunchMode launchMode;
private long lastUpdated;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import com.norconex.grid.core.Grid;
import com.norconex.grid.core.storage.GridMap;

import lombok.extern.slf4j.Slf4j;

/**
* Facade to a grid store for persisting or getting session state
* values (e.g., flags) or other session-specific information, shared across
* nodes (when on a grid).
*/
@Slf4j
public class CrawlSessionProperties {

private static final String QUEUE_INITIALIZED = "queueInitialized";
Expand Down Expand Up @@ -55,6 +58,7 @@ public Optional<CrawlState> getCrawlState() {
}

public void updateCrawlState(CrawlState state) {
LOG.info("Updating crawl state for crawler {} to {}.", id, state);
var store = CrawlSessionManager.sessionStore(grid);
var session = store.get(id);
if (session == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,30 @@ private static CrawlSession doResolve(
session = new CrawlSession()
.setCrawlerId(id)
.setCrawlMode(CrawlMode.FULL)
.setResumeState(ResumeState.INITIAL);
.setLaunchMode(LaunchMode.NEW);
} else if (session.getCrawlState() == CrawlState.RUNNING) {
if (System.currentTimeMillis()
- session.getLastUpdated() > sessionTimeout.toMillis()) {
LOG.warn("A crawl session for crawler {} was "
+ "detected but expired. Trying to resume it.", id);
session.setResumeState(ResumeState.RESUMED);
session.setLaunchMode(LaunchMode.RESUMED);
} else {
LOG.info("Joining crawl session for crawler {}.", id);
}
} else if (session.getCrawlState() == CrawlState.PAUSED) {
LOG.info("A previously paused crawl session was detected for. "
LOG.info("A previously paused crawl session was detected for "
+ "crawler {}. Resuming it.", id);
session.setResumeState(ResumeState.RESUMED);
session.setLaunchMode(LaunchMode.RESUMED);
} else if (session.getCrawlState() == CrawlState.COMPLETED) {
LOG.info("A previously completed crawl session was detected for. "
LOG.info("A previously completed crawl session was detected for "
+ "crawler {}. Starting a new incremental crawl session.",
id);
session.setCrawlMode(CrawlMode.INCREMENTAL);
session.setLaunchMode(LaunchMode.NEW);
} else if (session.getCrawlState() == CrawlState.FAILED) {
LOG.warn("A crawl session for crawler {} was detected but is "
+ "marked as failed. Trying to resume it.", id);
session.setResumeState(ResumeState.RESUMED);
session.setLaunchMode(LaunchMode.RESUMED);
}

//MAYBE: if we can detect here that the state is not valid for joining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@
*/
package com.norconex.crawler.core.session;

public enum ResumeState {
INITIAL,
/**
* Describes how a job was launched (new launch or resuming a non
* completed/failed one).
*/
public enum LaunchMode {
/**
* The crawler started from the beginning (first-time or restarted after
* success, regardless of crawl mode -- incremental or full).
*/
NEW,
/**
* The crawler resumed from a prior incomplete execution (e.g., paused
* or failed).
*/
RESUMED
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static CrawlContext createCrawlerContext(
.setCrawlMode(CrawlMode.FULL)
.setCrawlState(CrawlState.RUNNING)
.setLastUpdated(System.currentTimeMillis())
.setResumeState(ResumeState.INITIAL))
.setLaunchMode(LaunchMode.NEW))
.build()
.create();
}
Expand Down
Loading