Skip to content

zhenik/akka-persistent-actor-example

Folders and files

NameName
Last commit message
Last commit date

Latest commit

4b92393 · Jun 22, 2018

History

11 Commits
Jun 1, 2018
Apr 17, 2018
Jun 22, 2018
Apr 17, 2018
Jun 1, 2018
Jun 22, 2018

Repository files navigation

Task at work

Features:

  1. List all files in directory and send their names to kafka
  2. Track each new created file in directory and send to kafka

Specific: Handle application restart, save state of processed files on local node (decoupled from kafka)

Solution

For listing all files and tracking events in directory I used alpakka file connector

  1. Listing
final Source<PathAndProcessed, NotUsed> listDirSource = Directory.ls(fs.getPath(imgDir))
  .mapAsync(1, (Path e) -> {
    return
      PatternsCS.ask(
        persistentActor,
        new IsFileProcessed(e.toString()),
        Duration.ofSeconds(4).toMillis())
          .thenApply(a -> (IsFileProcessedAnswer) a)
          .thenApply(isFileProcessedAnswer -> new PathAndProcessed(e.toString(), isFileProcessedAnswer.isAnswer()));
  });
  1. Tracking
final Source<PathAndProcessed, NotUsed> changesSource =
  DirectoryChangesSource
    .create(fs.getPath(imgDir), pollingInterval, maxBufferSize)
    .filter(pair -> pair.second().equals(DirectoryChange.Creation))
    .map(Pair::first)
    .map(e -> {
      out.println("New file: " + e.toString());
      return new PathAndProcessed(e.toString(), false);
  });

Specific: For handling state was create DirListingStatePersistentActor, which use levelDB.

<dependency>
  <groupId>org.iq80.leveldb</groupId>
  <artifactId>leveldb</artifactId>
  <version>0.7</version>
</dependency>

While listing -> ask persistent actor is file processed

...
PatternsCS.ask(
  persistentActor,
  new IsFileProcessed(e.toString())
...

While sink record to kafka, tell to persistent actor that file processed.

persistentActor.tell(new Cmd(e.key()), null);

e.key() is filename from ProducerRecord.

When application restarts, persistent actor recover state from levelDB.

Akka-streams topology

img

About

Alpakka, Kafka, Reactive streams, Actor model, Akka

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages