Skip to content

Commit 8b7d4e6

Browse files
committed
Log sorting
Logs from the Doppler log streaming endpoint are not guaranteed to be in order temporally. Previously, the log streaming operation (Applications#logs) punted on this issue, requiring the user to deal with it. Thanks to Kris De Volder doing the hard work of designing a solid reactive algorithm, we can now include this functionality in the client itself.
1 parent fd6f7af commit 8b7d4e6

File tree

6 files changed

+103
-4
lines changed

6 files changed

+103
-4
lines changed

.idea/dictionaries/bhale.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/_PasswordGrantTokenProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private String getTokenUri(String root) {
9292
.queryParam("grant_type", "password")
9393
.queryParam("username", getUsername())
9494
.queryParam("password", getPassword())
95-
.build().toUriString();
95+
.build().encode().toUriString();
9696
}
9797

9898
}

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.cloudfoundry.util.OperationUtils;
9292
import org.cloudfoundry.util.PaginationUtils;
9393
import org.cloudfoundry.util.ResourceUtils;
94+
import org.cloudfoundry.util.SortingUtils;
9495
import reactor.core.publisher.Flux;
9596
import reactor.core.publisher.Mono;
9697
import reactor.util.function.Tuple2;
@@ -100,6 +101,7 @@
100101
import java.io.InputStream;
101102
import java.time.Duration;
102103
import java.util.Collections;
104+
import java.util.Comparator;
103105
import java.util.Date;
104106
import java.util.HashMap;
105107
import java.util.List;
@@ -122,6 +124,10 @@ public final class DefaultApplications implements Applications {
122124

123125
private static final int CF_STAGING_NOT_FINISHED = 170002;
124126

127+
private static final Comparator<LogMessage> LOG_MESSAGE_COMPARATOR = (a, b) -> a.getTimestamp().compareTo(b.getTimestamp());
128+
129+
private static final Duration LOG_MESSAGE_TIMESPAN = Duration.ofMillis(500);
130+
125131
private static final int MAX_NUMBER_OF_RECENT_EVENTS = 50;
126132

127133
private static final String STARTED_STATE = "STARTED";
@@ -654,12 +660,13 @@ private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, Strin
654660
return requestLogsRecent(dopplerClient, applicationId)
655661
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
656662
.map(Envelope::getLogMessage)
657-
.collectSortedList((a, b) -> a.getTimestamp().compareTo(b.getTimestamp()))
663+
.collectSortedList(LOG_MESSAGE_COMPARATOR)
658664
.flatMapIterable(d -> d);
659665
} else {
660666
return requestLogsStream(dopplerClient, applicationId)
661667
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
662-
.map(Envelope::getLogMessage);
668+
.map(Envelope::getLogMessage)
669+
.compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
663670
}
664671
}
665672

cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2475,7 +2475,6 @@ public static final class LogsRecentNotSet extends AbstractOperationsApiTest<Log
24752475

24762476
private final DefaultApplications applications = new DefaultApplications(Mono.just(this.cloudFoundryClient), Mono.just(this.dopplerClient), Mono.just(TEST_SPACE_ID));
24772477

2478-
24792478
@Before
24802479
public void setUp() throws Exception {
24812480
requestApplications(this.cloudFoundryClient, "test-application-name", TEST_SPACE_ID, "test-metadata-id");

cloudfoundry-util/src/main/java/org/cloudfoundry/util/ResourceUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.cloudfoundry.client.v2.Resource;
2121
import reactor.core.publisher.Flux;
2222

23+
/**
24+
* Utilities for dealing with {@link Resource}s
25+
*/
2326
public final class ResourceUtils {
2427

2528
private ResourceUtils() {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2013-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.util;
18+
19+
import reactor.core.Cancellation;
20+
import reactor.core.publisher.DirectProcessor;
21+
import reactor.core.publisher.Flux;
22+
import reactor.util.function.Tuple2;
23+
24+
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.util.ArrayList;
27+
import java.util.Comparator;
28+
import java.util.List;
29+
import java.util.PriorityQueue;
30+
import java.util.Queue;
31+
import java.util.function.Function;
32+
33+
/**
34+
* Utilities for sorting
35+
*/
36+
public final class SortingUtils {
37+
38+
private SortingUtils() {
39+
}
40+
41+
/**
42+
* Sorts the elements of a {@link Flux} within a sliding time window. This sorter should be used when element order may be scrambled, but that scrambling has a certain 'temporal locality' to it.
43+
* This assumption means that sorting can be limited to elements that arrive temporally close to one another without risking a latecomer being sorted incorrectly.
44+
*
45+
* @param comparator a {@link Comparator} to use when sorting the elements within the window
46+
* @param timespan the duration of the 'temporal locality'
47+
* @param <T> The type of the elements to be sorted
48+
* @return a {@link Flux} providing the sorted elements
49+
*/
50+
public static <T> Function<Flux<T>, Flux<T>> timespan(Comparator<T> comparator, Duration timespan) {
51+
return source -> {
52+
Queue<Tuple2<Long, T>> accumulator = new PriorityQueue<>((o1, o2) -> comparator.compare(o1.t2, o2.t2));
53+
Object monitor = new Object();
54+
55+
DirectProcessor<Void> d = DirectProcessor.create();
56+
57+
Cancellation cancellation = source
58+
.timestamp()
59+
.subscribe(e -> {
60+
synchronized (monitor) {
61+
accumulator.add(e);
62+
}
63+
}, d::onError, d::onComplete);
64+
65+
return Flux
66+
.interval(timespan)
67+
.takeUntilOther(d)
68+
.flatMap(n -> getItems(accumulator, comparator, timespan), null, () -> getItems(accumulator, comparator, Duration.ZERO))
69+
.doOnCancel(cancellation::dispose);
70+
};
71+
}
72+
73+
private static <T> Flux<T> getItems(Queue<Tuple2<Long, T>> accumulator, Object monitor, Duration timespan) {
74+
List<T> items = new ArrayList<>();
75+
76+
synchronized (monitor) {
77+
while (isBefore(accumulator.peek(), timespan)) {
78+
items.add(accumulator.remove().t2);
79+
}
80+
}
81+
82+
return Flux.fromIterable(items);
83+
}
84+
85+
private static <T> boolean isBefore(Tuple2<Long, T> candidate, Duration timespan) {
86+
return candidate != null && (Duration.ZERO == timespan || Instant.ofEpochMilli(candidate.t1).isBefore(Instant.now().minus(timespan)));
87+
}
88+
89+
}

0 commit comments

Comments
 (0)