Skip to content

Commit 76c50a7

Browse files
authored
Merge pull request #179 from conorschofield/timeoutAndOverride
Timeout and override post support as property values
2 parents 053418f + 1ff89ed commit 76c50a7

File tree

7 files changed

+418
-52
lines changed

7 files changed

+418
-52
lines changed

src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.phoebus.channelfinder.processors.ChannelProcessor;
88
import org.springframework.beans.factory.annotation.Value;
99
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.beans.factory.annotation.Autowired;
1011

1112
import java.util.ArrayList;
1213
import java.util.Arrays;
@@ -51,6 +52,7 @@ public class AAChannelProcessor implements ChannelProcessor {
5152
@Value("${aa.auto_pause:}")
5253
private List<String> autoPauseOptions;
5354

55+
@Autowired
5456
private final ArchiverClient archiverClient = new ArchiverClient();
5557

5658
@Override
@@ -195,24 +197,16 @@ private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
195197
if (archivePVS.isEmpty()) {
196198
return result;
197199
}
198-
199-
try {
200-
List<Map<String, String>> statuses = archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.version());
201-
statuses
202-
.forEach(archivePVStatusJsonMap -> {
203-
String archiveStatus = archivePVStatusJsonMap.get("status");
204-
String pvName = archivePVStatusJsonMap.get("pvName");
205-
String pvStatus = archivePVS.get(pvName).getPvStatus();
206-
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
207-
result.get(action).add(archivePVS.get(pvName));
208-
});
209-
return result;
210-
211-
} catch (JsonProcessingException e) {
212-
// problem collecting policies from AA, so warn and return empty list
213-
logger.log(Level.WARNING, () -> "Could not get AA pv Status list: " + e.getMessage());
214-
return result;
215-
}
200+
List<Map<String, String>> statuses = archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias());
201+
statuses
202+
.forEach(archivePVStatusJsonMap -> {
203+
String archiveStatus = archivePVStatusJsonMap.get("status");
204+
String pvName = archivePVStatusJsonMap.get("pvName");
205+
String pvStatus = archivePVS.get(pvName).getPvStatus();
206+
ArchiveAction action = pickArchiveAction(archiveStatus, pvStatus);
207+
result.get(action).add(archivePVS.get(pvName));
208+
});
209+
return result;
216210
}
217211

218212
private ArchivePVOptions createArchivePV(

src/main/java/org/phoebus/channelfinder/processors/aa/ArchiverClient.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.springframework.stereotype.Component;
99
import org.springframework.web.reactive.function.client.WebClient;
1010
import org.springframework.web.util.UriComponentsBuilder;
11+
import org.springframework.beans.factory.annotation.Value;
1112
import reactor.core.publisher.Mono;
1213

1314
import java.net.URI;
@@ -27,26 +28,6 @@
2728
public class ArchiverClient {
2829
private static final Logger logger = Logger.getLogger(ArchiverClient.class.getName());
2930
private static final int STATUS_BATCH_SIZE = 100; // Limit comes from tomcat server maxHttpHeaderSize which by default is a header of size 8k
30-
private static final List<String> AA_STATUS_ENDPOINT_ONLY_SUPPORT_QUERY_VERSION = List.of("1.1.0",
31-
"Before_JDK_12_Upgrade",
32-
"v0.0.1_SNAPSHOT_03-November-2015",
33-
"v0.0.1_SNAPSHOT_09-Oct-2018",
34-
"v0.0.1_SNAPSHOT_10-June-2017",
35-
"v0.0.1_SNAPSHOT_10-Sep-2015",
36-
"v0.0.1_SNAPSHOT_12-May-2016",
37-
"v0.0.1_SNAPSHOT_12-Oct-2016",
38-
"v0.0.1_SNAPSHOT_13-Nov-2019",
39-
"v0.0.1_SNAPSHOT_14-Jun-2018",
40-
"v0.0.1_SNAPSHOT_15-Nov-2018",
41-
"v0.0.1_SNAPSHOT_20-Sept-2016",
42-
"v0.0.1_SNAPSHOT_22-June-2016",
43-
"v0.0.1_SNAPSHOT_22-June-2017",
44-
"v0.0.1_SNAPSHOT_23-Sep-2015",
45-
"v0.0.1_SNAPSHOT_26-January-2016",
46-
"v0.0.1_SNAPSHOT_27-Nov-2017",
47-
"v0.0.1_SNAPSHOT_29-July-2015",
48-
"v0.0.1_SNAPSHOT_30-March-2016",
49-
"v0.0.1_SNAPSHOT_30-September-2021");
5031

5132
private final WebClient client = WebClient.create();
5233

@@ -55,23 +36,32 @@ public class ArchiverClient {
5536
private static final String PV_STATUS_RESOURCE = MGMT_RESOURCE + "/getPVStatus";
5637
private static final String ARCHIVER_VERSIONS_RESOURCE = MGMT_RESOURCE + "/getVersions";
5738
private static final ObjectMapper objectMapper = new ObjectMapper();
58-
private static final int TIMEOUT_SECONDS = 15;
39+
40+
@Value("${aa.timeout_seconds:15}")
41+
private int timeoutSeconds;
42+
@Value("${aa.post_support:}")
43+
private List<String> postSupportArchivers;
5944

6045
private Stream<List<String>> partitionSet(Set<String> pvSet, int pageSize) {
6146
List<String> list = new ArrayList<>(pvSet);
6247
return IntStream.range(0, (list.size() + pageSize - 1) / pageSize)
6348
.mapToObj(i -> list.subList(i * pageSize, Math.min(pageSize * (i + 1), list.size())));
6449
}
6550

66-
List<Map<String, String>> getStatuses(Map<String, ArchivePVOptions> archivePVS, String archiverURL, String archiverVersion) throws JsonProcessingException {
51+
List<Map<String, String>> getStatuses(Map<String, ArchivePVOptions> archivePVS, String archiverURL, String archiverAlias) {
6752
Set<String> pvs = archivePVS.keySet();
68-
if (AA_STATUS_ENDPOINT_ONLY_SUPPORT_QUERY_VERSION.contains(archiverVersion)) {
69-
53+
Boolean postSupportOverride = postSupportArchivers.contains(archiverAlias);
54+
logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias);
55+
logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers);
56+
57+
if (Boolean.TRUE.equals(postSupportOverride)) {
58+
logger.log(Level.INFO, "Post Support");
59+
return getStatusesFromPvListBody(archiverURL, pvs.stream().toList());
60+
} else {
61+
logger.log(Level.INFO, "Query Support");
7062
Stream<List<String>> stream = partitionSet(pvs, STATUS_BATCH_SIZE);
7163

7264
return stream.map(pvList -> getStatusesFromPvListQuery(archiverURL, pvList)).flatMap(List::stream).toList();
73-
} else {
74-
return getStatusesFromPvListBody(archiverURL, pvs.stream().toList());
7565
}
7666
}
7767

@@ -86,7 +76,7 @@ private List<Map<String, String>> getStatusesFromPvListQuery(String archiverURL,
8676
.uri(pvStatusURI)
8777
.retrieve()
8878
.bodyToMono(String.class)
89-
.timeout(Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS))
79+
.timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
9080
.onErrorResume(e -> showError(uriString, e))
9181
.block();
9282

@@ -110,7 +100,7 @@ private List<Map<String, String>> getStatusesFromPvListBody(String archiverURL,
110100
.bodyValue(pvs)
111101
.retrieve()
112102
.bodyToMono(String.class)
113-
.timeout(Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS))
103+
.timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
114104
.onErrorResume(e -> showError(uriString, e))
115105
.block();
116106

@@ -125,7 +115,7 @@ private List<Map<String, String>> getStatusesFromPvListBody(String archiverURL,
125115
} catch (JsonProcessingException e) {
126116
logger.log(Level.WARNING, "Could not parse pv status response: " + e.getMessage());
127117
} catch (Exception e) {
128-
logger.log(Level.WARNING, String.format("Error when trying to get status from pv list query: %s", e.getMessage()));
118+
logger.log(Level.WARNING, String.format("Error when trying to get status from pv list body: %s", e.getMessage()));
129119
}
130120
return List.of();
131121
}
@@ -139,7 +129,7 @@ private void submitAction(String values, String endpoint, String aaURL) {
139129
.bodyValue(values)
140130
.retrieve()
141131
.bodyToMono(String.class)
142-
.timeout(Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS))
132+
.timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
143133
.onErrorResume(e -> showError(uriString, e))
144134
.block();
145135
logger.log(Level.FINE, () -> response);
@@ -227,7 +217,7 @@ String getVersion(String archiverURL) {
227217
.uri(URI.create(uriString))
228218
.retrieve()
229219
.bodyToMono(String.class)
230-
.timeout(Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS))
220+
.timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS))
231221
.onErrorResume(e -> showError(uriString, e))
232222
.block();
233223
Map<String, String> versionMap = objectMapper.readValue(response, Map.class);

src/main/resources/application.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ aa.enabled=true
122122
aa.pva=false
123123
aa.archive_property_name=archive
124124
aa.archiver_property_name=archiver
125+
aa.timeout_seconds=15
126+
127+
# Comma-separated list of archivers to use post support
128+
aa.post_support=
125129

126130
# Set the auto pause behaviour
127131
#

src/site/sphinx/aa_processor.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ A list of archiver appliance URLs and aliases. ::
1111

1212
aa.urls={'default': 'http://archiver-01.example.com:17665', 'neutron-controls': 'http://archiver-02.example.com:17665'}
1313

14+
By default the listed archivers will get statuses by pv query.
15+
To set the archivers to use get statuses by pv list body, set the property :ref:`aa.post_support` to a comma separated list of archiver aliases. ::
16+
17+
aa.post_support=default, neutron-controls
18+
1419
To set the choice of default archiver appliance, set the property :ref:`aa.default_alias` to the alias of the default archiver appliance. This setting can also be a comma-separated list if you want multiple default archivers.
1520

1621
To pass the PV as "pva://PVNAME" to the archiver appliance, set the property :ref:`aa.pva` to **true**.

0 commit comments

Comments
 (0)