Skip to content

Commit

Permalink
[FLINK-9642][cep] Added caching layer to SharedBuffer(apache#6205)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored and dawidwys committed Aug 29, 2018
1 parent 7eb41f3 commit 260f789
Show file tree
Hide file tree
Showing 11 changed files with 784 additions and 523 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.cep.nfa.ComputationState;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -87,21 +87,21 @@ public static AfterMatchSkipStrategy noSkip() {
*
* @param matchesToPrune current partial matches
* @param matchedResult already completed matches
* @param sharedBuffer corresponding shared buffer
* @param sharedBufferAccessor accessor to corresponding shared buffer
* @throws Exception thrown if could not access the state
*/
public void prune(
Collection<ComputationState> matchesToPrune,
Collection<Map<String, List<EventId>>> matchedResult,
SharedBuffer<?> sharedBuffer) throws Exception {
SharedBufferAccessor<?> sharedBufferAccessor) throws Exception {

EventId pruningId = getPruningId(matchedResult);
if (pruningId != null) {
List<ComputationState> discardStates = new ArrayList<>();
for (ComputationState computationState : matchesToPrune) {
if (computationState.getStartEventID() != null &&
shouldPrune(computationState.getStartEventID(), pruningId)) {
sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
discardStates.add(computationState);
}
}
Expand Down
Loading

0 comments on commit 260f789

Please sign in to comment.