Skip to content

Commit e3d48a5

Browse files
committed
Merge branch 'log-sorting'
2 parents fd6f7af + 8b7d4e6 commit e3d48a5

File tree

6 files changed

+103
-4
lines changed

6 files changed

+103
-4
lines changed

.idea/dictionaries/bhale.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/_PasswordGrantTokenProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private String getTokenUri(String root) {
9292
.queryParam("grant_type", "password")
9393
.queryParam("username", getUsername())
9494
.queryParam("password", getPassword())
95-
.build().toUriString();
95+
.build().encode().toUriString();
9696
}
9797

9898
}

cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.cloudfoundry.util.OperationUtils;
9292
import org.cloudfoundry.util.PaginationUtils;
9393
import org.cloudfoundry.util.ResourceUtils;
94+
import org.cloudfoundry.util.SortingUtils;
9495
import reactor.core.publisher.Flux;
9596
import reactor.core.publisher.Mono;
9697
import reactor.util.function.Tuple2;
@@ -100,6 +101,7 @@
100101
import java.io.InputStream;
101102
import java.time.Duration;
102103
import java.util.Collections;
104+
import java.util.Comparator;
103105
import java.util.Date;
104106
import java.util.HashMap;
105107
import java.util.List;
@@ -122,6 +124,10 @@ public final class DefaultApplications implements Applications {
122124

123125
private static final int CF_STAGING_NOT_FINISHED = 170002;
124126

127+
private static final Comparator<LogMessage> LOG_MESSAGE_COMPARATOR = (a, b) -> a.getTimestamp().compareTo(b.getTimestamp());
128+
129+
private static final Duration LOG_MESSAGE_TIMESPAN = Duration.ofMillis(500);
130+
125131
private static final int MAX_NUMBER_OF_RECENT_EVENTS = 50;
126132

127133
private static final String STARTED_STATE = "STARTED";
@@ -654,12 +660,13 @@ private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, Strin
654660
return requestLogsRecent(dopplerClient, applicationId)
655661
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
656662
.map(Envelope::getLogMessage)
657-
.collectSortedList((a, b) -> a.getTimestamp().compareTo(b.getTimestamp()))
663+
.collectSortedList(LOG_MESSAGE_COMPARATOR)
658664
.flatMapIterable(d -> d);
659665
} else {
660666
return requestLogsStream(dopplerClient, applicationId)
661667
.filter(e -> EventType.LOG_MESSAGE == e.getEventType())
662-
.map(Envelope::getLogMessage);
668+
.map(Envelope::getLogMessage)
669+
.compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
663670
}
664671
}
665672

cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2475,7 +2475,6 @@ public static final class LogsRecentNotSet extends AbstractOperationsApiTest<Log
24752475

24762476
private final DefaultApplications applications = new DefaultApplications(Mono.just(this.cloudFoundryClient), Mono.just(this.dopplerClient), Mono.just(TEST_SPACE_ID));
24772477

2478-
24792478
@Before
24802479
public void setUp() throws Exception {
24812480
requestApplications(this.cloudFoundryClient, "test-application-name", TEST_SPACE_ID, "test-metadata-id");

cloudfoundry-util/src/main/java/org/cloudfoundry/util/ResourceUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.cloudfoundry.client.v2.Resource;
2121
import reactor.core.publisher.Flux;
2222

23+
/**
24+
* Utilities for dealing with {@link Resource}s
25+
*/
2326
public final class ResourceUtils {
2427

2528
private ResourceUtils() {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2013-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.util;
18+
19+
import reactor.core.Cancellation;
20+
import reactor.core.publisher.DirectProcessor;
21+
import reactor.core.publisher.Flux;
22+
import reactor.util.function.Tuple2;
23+
24+
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.util.ArrayList;
27+
import java.util.Comparator;
28+
import java.util.List;
29+
import java.util.PriorityQueue;
30+
import java.util.Queue;
31+
import java.util.function.Function;
32+
33+
/**
34+
* Utilities for sorting
35+
*/
36+
public final class SortingUtils {
37+
38+
private SortingUtils() {
39+
}
40+
41+
/**
42+
* Sorts the elements of a {@link Flux} within a sliding time window. This sorter should be used when element order may be scrambled, but that scrambling has a certain 'temporal locality' to it.
43+
* This assumption means that sorting can be limited to elements that arrive temporally close to one another without risking a latecomer being sorted incorrectly.
44+
*
45+
* @param comparator a {@link Comparator} to use when sorting the elements within the window
46+
* @param timespan the duration of the 'temporal locality'
47+
* @param <T> The type of the elements to be sorted
48+
* @return a {@link Flux} providing the sorted elements
49+
*/
50+
public static <T> Function<Flux<T>, Flux<T>> timespan(Comparator<T> comparator, Duration timespan) {
51+
return source -> {
52+
Queue<Tuple2<Long, T>> accumulator = new PriorityQueue<>((o1, o2) -> comparator.compare(o1.t2, o2.t2));
53+
Object monitor = new Object();
54+
55+
DirectProcessor<Void> d = DirectProcessor.create();
56+
57+
Cancellation cancellation = source
58+
.timestamp()
59+
.subscribe(e -> {
60+
synchronized (monitor) {
61+
accumulator.add(e);
62+
}
63+
}, d::onError, d::onComplete);
64+
65+
return Flux
66+
.interval(timespan)
67+
.takeUntilOther(d)
68+
.flatMap(n -> getItems(accumulator, comparator, timespan), null, () -> getItems(accumulator, comparator, Duration.ZERO))
69+
.doOnCancel(cancellation::dispose);
70+
};
71+
}
72+
73+
private static <T> Flux<T> getItems(Queue<Tuple2<Long, T>> accumulator, Object monitor, Duration timespan) {
74+
List<T> items = new ArrayList<>();
75+
76+
synchronized (monitor) {
77+
while (isBefore(accumulator.peek(), timespan)) {
78+
items.add(accumulator.remove().t2);
79+
}
80+
}
81+
82+
return Flux.fromIterable(items);
83+
}
84+
85+
private static <T> boolean isBefore(Tuple2<Long, T> candidate, Duration timespan) {
86+
return candidate != null && (Duration.ZERO == timespan || Instant.ofEpochMilli(candidate.t1).isBefore(Instant.now().minus(timespan)));
87+
}
88+
89+
}

0 commit comments

Comments
 (0)