From de3bc7eb9930aa0f40c87e23278a13e79f73c504 Mon Sep 17 00:00:00 2001 From: "Florian, LEFEVRE" Date: Wed, 10 Sep 2025 16:56:40 +0200 Subject: [PATCH 1/2] feat: enable support for influxdb v2 as data source --- app/trends/archive-reader/pom.xml | 5 + .../reader/influx2/InfluxArchiveReader.java | 582 +++++++++++++++++ .../influx2/InfluxArchiveReaderFactory.java | 31 + .../influx2/InfluxDecimatedValueIterator.java | 339 ++++++++++ .../reader/influx2/InfluxPreferences.java | 23 + .../reader/influx2/InfluxValueIterator.java | 231 +++++++ .../reader/influx2/IteratorListener.java | 12 + ...us.archive.reader.spi.ArchiveReaderFactory | 1 + core/pom.xml | 1 + core/pv-influx2/pom.xml | 74 +++ .../phoebus/pv/influx2/InfluxDB_Context.java | 69 ++ .../phoebus/pv/influx2/InfluxDB_Helper.java | 64 ++ .../org/phoebus/pv/influx2/InfluxDB_PV.java | 257 ++++++++ .../pv/influx2/InfluxDB_PVFactory.java | 43 ++ .../pv/influx2/InfluxDB_PollingManager.java | 377 +++++++++++ .../pv/influx2/InfluxDB_Preferences.java | 158 +++++ .../services/org.phoebus.pv.PVFactory | 1 + .../org/phoebus/pv/influx2/InfluxPVTest.java | 614 ++++++++++++++++++ phoebus-product/pom.xml | 5 + pom.xml | 1 + 20 files changed, 2888 insertions(+) create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReader.java create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReaderFactory.java create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxDecimatedValueIterator.java create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxPreferences.java create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxValueIterator.java create mode 100644 app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java create mode 100644 core/pv-influx2/pom.xml create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java create mode 100644 core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java create mode 100644 core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory create mode 100644 core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java diff --git a/app/trends/archive-reader/pom.xml b/app/trends/archive-reader/pom.xml index e374e23b9c..e79b487a56 100644 --- a/app/trends/archive-reader/pom.xml +++ b/app/trends/archive-reader/pom.xml @@ -44,6 +44,11 @@ pbrawclient 0.0.10 + + com.influxdb + influxdb-client-java + ${influxdb2Client.version} + org.phoebus core-ui diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReader.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReader.java new file mode 100644 index 0000000000..1567d863cc --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReader.java @@ -0,0 +1,582 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +import com.influxdb.client.BucketsApi; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.QueryApi; +import com.influxdb.client.domain.Bucket; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.WeakHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.phoebus.archive.reader.ArchiveReader; +import org.phoebus.archive.reader.UnknownChannelException; +import org.phoebus.archive.reader.ValueIterator; +import org.phoebus.ui.text.RegExHelper; + +/** + * Archive reader for InfluxDB v2. + * Uses InfluxDB native capabilities for downsampling and aggregation. + * Supports automatic bucket detection and all PV name formats. + */ +public class InfluxArchiveReader implements ArchiveReader, IteratorListener { + private final String baseUrl; + private final String influxUrl; + private final InfluxDBClient influxClient; + protected final QueryApi queryApi; + + private final Map iterators = Collections.synchronizedMap( + new WeakHashMap<>()); + + private static final Logger logger = Logger.getLogger(InfluxArchiveReader.class.getName()); + + public InfluxArchiveReader() { + this.influxUrl = "influx://" + InfluxPreferences.ip + ":" + InfluxPreferences.port; + + boolean useHttps = InfluxPreferences.useHttps; + this.baseUrl = useHttps + ? influxUrl.replace("influx://", "https://") + : influxUrl.replace("influx://", "http://"); + + char[] token = InfluxPreferences.token.toCharArray(); + String org = InfluxPreferences.org; + this.influxClient = InfluxDBClientFactory.create(baseUrl, token, org); + this.queryApi = influxClient.getQueryApi(); + } + + public InfluxArchiveReader(final String url) { + this.influxUrl = url.endsWith("/") ? url.substring(0, url.length() - 1) : url; + + boolean useHttps = InfluxPreferences.useHttps; + this.baseUrl = useHttps + ? influxUrl.replace("influx://", "https://") + : influxUrl.replace("influx://", "http://"); + + char[] token = InfluxPreferences.token.toCharArray(); + String org = InfluxPreferences.org; + this.influxClient = InfluxDBClientFactory.create(baseUrl, token, org); + this.queryApi = influxClient.getQueryApi(); + } + + public InfluxArchiveReader(final String url, final String bucket) { + this(url); + InfluxPreferences.bucket = bucket; + } + + public String getURL() { + return influxUrl; + } + + @Override + public String getDescription() { + return "InfluxDB Archive Reader (Optimized)\n" + + "Server URL: " + baseUrl + "\n" + + "Organization: " + InfluxPreferences.org + "\n" + + "Bucket: " + InfluxPreferences.bucket + "\n"; + } + + @Override + public List getNamesByPattern(final String globPattern) throws Exception { + String regex = RegExHelper.fullRegexFromGlob(globPattern); + String bucket = resolveBucket(null); + + String flux = String.format( + """ + import "influxdata/influxdb/schema" + schema.measurements(bucket: "%s", start: 1970-01-01T00:00:00Z) + |> filter(fn: (r) => r._value =~ /%s/) + |> limit(n: 1000)""", + bucket, regex); + + List tables = queryApi.query(flux); + List results = new ArrayList<>(); + + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + Object value = record.getValueByKey("_value"); + if (value != null) { + results.add(value.toString()); + } + } + } + + return results; + } + + /** + * Checks if a bucket exists in InfluxDB. + * Used to disambiguate between bucket/measurement and measurement/field formats. + * + * @param bucketName the name of the bucket to check + * @return true if the bucket exists, false otherwise + */ + private boolean bucketExists(String bucketName) { + try { + BucketsApi bucketsApi = influxClient.getBucketsApi(); + List buckets = bucketsApi.findBuckets(); + + return buckets.stream() + .anyMatch(bucket -> bucket.getName().equals(bucketName)); + } catch (Exception e) { + logger.log(Level.WARNING, "Error checking bucket existence: " + bucketName, e); + return false; + } + } + + /** + * Gets the name of the last created bucket in InfluxDB. + * Queries the InfluxDB API for all buckets and sorts them by creation date + * to find the most recently created one. + * + * @return The name of the last created bucket, or null if no buckets are found + */ + public String getLastBucketName() { + try { + BucketsApi bucketsApi = influxClient.getBucketsApi(); + List buckets = bucketsApi.findBuckets(); + + if (buckets.isEmpty()) { + logger.log(Level.SEVERE, "No buckets found in InfluxDB"); + return null; + } + + Optional lastBucket = buckets.stream().max(Comparator.comparing(Bucket::getUpdatedAt)); + + return lastBucket.get().getName(); + } catch (Exception e) { + logger.log(Level.SEVERE, "Error retrieving buckets from InfluxDB", e); + return null; + } + } + + /** + * Resolves the bucket name to use, handling automatic last bucket detection. + * + * @param specifiedBucket the bucket specified in the PV name, or null for auto-detection + * @return the resolved bucket name + * @throws Exception if no bucket can be resolved + */ + private String resolveBucket(String specifiedBucket) throws Exception { + if (specifiedBucket != null && !specifiedBucket.isEmpty()) { + return specifiedBucket; + } + + String lastBucket = getLastBucketName(); + if (lastBucket == null) { + throw new Exception("No bucket available - no default bucket configured and cannot determine last bucket"); + } + + return lastBucket; + } + + /** + * Parses a PV name to extract measurement and field information. + * Supports automatic bucket detection for all PV name formats: + * - influx://measurement (uses last bucket, no specific field) + * - influx://measurement/field (uses last bucket, specific field) + * - influx://bucket/measurement (specific bucket, no specific field) + * - influx://bucket/measurement/field (specific bucket, specific field) + * + * @param name the PV name to parse + * @return a map containing bucket, measurement, and optionally field + * @throws Exception if the PV name format is invalid or bucket cannot be resolved + */ + protected Map parsePvName(String name) throws Exception { + Map result = new HashMap<>(); + name = name.replace("influx://", ""); + + if (name.endsWith("/")) { + name = name.substring(0, name.length() - 1); + } + String[] parts = name.split("/"); + + String resolvedBucket = null; + String resolvedMeasurement; + String resolvedField = null; + boolean needsLastBucket = false; + + switch (parts.length) { + case 1: + needsLastBucket = true; + resolvedMeasurement = parts[0]; + break; + case 2: + if (bucketExists(parts[0])) { + resolvedBucket = parts[0]; + resolvedMeasurement = parts[1]; + } else { + needsLastBucket = true; + resolvedMeasurement = parts[0]; + resolvedField = parts[1]; + } + break; + case 3: + resolvedBucket = parts[0]; + resolvedMeasurement = parts[1]; + resolvedField = Objects.equals(parts[2], "null") ? null : parts[2]; + break; + default: + throw new IllegalArgumentException("Invalid InfluxDB PV format: " + name); + } + + if (needsLastBucket || resolvedBucket == null) { + resolvedBucket = resolveBucket(null); + } else { + resolvedBucket = resolveBucket(resolvedBucket); + } + + result.put("bucket", resolvedBucket); + result.put("measurement", resolvedMeasurement); + if (resolvedField != null) { + result.put("field", resolvedField); + } + + return result; + } + + @Override + public ValueIterator getRawValues(String name, Instant start, Instant end) throws Exception { + return getRawValues(name, start, end, null); + } + + /** + * Gets raw values for a PV within the specified time range. + * + * @param name the PV name + * @param start the start time + * @param end the end time + * @param mean the mean window for aggregation, or null for raw data + * @return a ValueIterator for the requested data + * @throws Exception if the PV cannot be found or accessed + */ + public ValueIterator getRawValues(String name, Instant start, Instant end, String mean) throws Exception { + Map pvInfo = parsePvName(name); + String measurement = pvInfo.get("measurement"); + String field = pvInfo.get("field"); + String bucket = pvInfo.get("bucket"); + + String originalBucket = InfluxPreferences.bucket; + InfluxPreferences.bucket = bucket; + + InfluxValueIterator it; + try { + it = new InfluxValueIterator(this, measurement, field, start, end, this, mean); + } catch (Exception e) { + throw new UnknownChannelException(name); + } finally { + InfluxPreferences.bucket = originalBucket; + } + + iterators.put(it, this); + return it; + } + + /** + * Gets the last recorded point for a given PV. + * + * @param name the PV name + * @return the last FluxRecord, or null if not found + */ + public FluxRecord getLastPoint(String name) { + try { + Map pvInfo = parsePvName(name); + String bucket = pvInfo.get("bucket"); + String measurement = pvInfo.get("measurement"); + String field = pvInfo.get("field"); + + String fieldFilter = field != null ? + String.format(" and r._field == \"%s\"", field) : ""; + + String flux = String.format( + "from(bucket: \"%s\") " + + "|> range(start: 0) " + + "|> filter(fn: (r) => r._measurement == \"%s\"%s) " + + "|> last()", + bucket, measurement, fieldFilter); + + List tables = queryApi.query(flux); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + return record; + } + } + } catch (Exception e) { + logger.log(Level.WARNING, "Error getting last point for " + name, e); + } + + return null; + } + + @Override + public ValueIterator getOptimizedValues(String name, Instant start, Instant end, int count) throws Exception { + Map pvInfo = parsePvName(name); + String measurement = pvInfo.get("measurement"); + String field = pvInfo.get("field"); + String bucket = pvInfo.get("bucket"); + + String originalBucket = InfluxPreferences.bucket; + InfluxPreferences.bucket = bucket; + + InfluxDecimatedValueIterator it; + try { + it = new InfluxDecimatedValueIterator(this, measurement, field, start, end, count, this); + } catch (Exception e) { + throw new UnknownChannelException(name); + } finally { + InfluxPreferences.bucket = originalBucket; + } + + iterators.put(it, this); + return it; + } + + /** + * Calculates statistics for a PV within the specified time range. + * + * @param name the PV name + * @param start the start time + * @param end the end time + * @return a map containing statistical values (count, sum, mean, stdDev, min, max) + */ + public Map getStatistics(String name, Instant start, Instant end) { + try { + Map pvInfo = parsePvName(name); + String measurement = pvInfo.get("measurement"); + String field = pvInfo.get("field"); + String bucket = pvInfo.get("bucket"); + + String fieldFilter = buildFieldFilter(bucket, measurement, field); + + if (fieldFilter == null) { + logger.log(Level.WARNING, "No valid field found for measurement: " + measurement); + return null; + } + + String quickCheckFlux = String.format( + """ + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and %s) + |> limit(n: 1) + |> yield(name: "check") + """, + bucket, start.toString(), end.toString(), measurement, fieldFilter); + + List quickCheckTables; + try { + quickCheckTables = queryApi.query(quickCheckFlux); + } catch (Exception e) { + logger.log(Level.WARNING, "Quick check query failed for " + name + ": " + e.getMessage()); + return null; + } + + if (quickCheckTables.isEmpty() || quickCheckTables.get(0).getRecords().isEmpty()) { + logger.log(Level.FINE, "No data found for " + name + " in time range"); + return createEmptyStats(); + } + + FluxRecord firstRecord = quickCheckTables.get(0).getRecords().get(0); + Object value = firstRecord.getValueByKey("_value"); + + if (value instanceof String) { + logger.log(Level.FINE, "Skipping statistics for string data: " + name); + return null; + } + + String basicStatsFlux = buildBasicStatsFlux(bucket, start, end, measurement, fieldFilter); + + Map stats = createEmptyStats(); + + try { + List basicStatsTables = queryApi.query(basicStatsFlux); + processBasicStatsResults(basicStatsTables, stats); + } catch (Exception e) { + logger.log(Level.WARNING, "Basic statistics query failed for " + name + ": " + e.getMessage()); + return stats; + } + + try { + String medianFlux = buildMedianFlux(bucket, start, end, measurement, fieldFilter); + List medianTables = queryApi.query(medianFlux); + processMedianResults(medianTables, stats); + } catch (Exception medianEx) { + logger.log(Level.FINE, "Median calculation failed for " + name + ": " + medianEx.getMessage()); + } + + return stats; + + } catch (Exception e) { + logger.log(Level.WARNING, "Error calculating statistics for " + name + ": " + e.getMessage()); + return createEmptyStats(); + } + } + + private Map createEmptyStats() { + Map stats = new HashMap<>(); + stats.put("count", 0.0); + stats.put("sum", 0.0); + stats.put("mean", 0.0); + stats.put("stdDev", 0.0); + stats.put("min", 0.0); + stats.put("max", 0.0); + stats.put("median", 0.0); + return stats; + } + + private String buildBasicStatsFlux(String bucket, Instant start, Instant end, + String measurement, String fieldFilter) { + return String.format( + """ + import "types" + + data = from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and %s) + |> filter(fn: (r) => types.isType(v: r._value, type: "float") or types.isType(v: r._value, type: "int")) + |> limit(n: 10000) + + count = data |> count() |> map(fn: (r) => ({stat: "count", value: float(v: r._value)})) + sum = data |> sum() |> map(fn: (r) => ({stat: "sum", value: r._value})) + mean = data |> mean() |> map(fn: (r) => ({stat: "mean", value: r._value})) + stdDev = data |> stddev() |> map(fn: (r) => ({stat: "stdDev", value: r._value})) + min = data |> min() |> map(fn: (r) => ({stat: "min", value: r._value})) + max = data |> max() |> map(fn: (r) => ({stat: "max", value: r._value})) + + union(tables: [count, sum, mean, stdDev, min, max]) + |> yield(name: "stats") + """, + bucket, start, end, measurement, fieldFilter); + } + + private String buildMedianFlux(String bucket, Instant start, Instant end, + String measurement, String fieldFilter) { + return String.format( + """ + import "types" + + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and %s) + |> filter(fn: (r) => types.isType(v: r._value, type: "float") or types.isType(v: r._value, type: "int")) + |> limit(n: 10000) + |> median(method: "exact_mean") + |> map(fn: (r) => ({stat: "median", value: r._value})) + |> yield(name: "median") + """, + bucket, start, end, measurement, fieldFilter); + } + + private void processBasicStatsResults(List tables, Map stats) { + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + String statName = (String) record.getValueByKey("stat"); + Object statValue = record.getValueByKey("value"); + + if (statName != null && statValue instanceof Number) { + stats.put(statName, ((Number) statValue).doubleValue()); + } + } + } + } + + private void processMedianResults(List tables, Map stats) { + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + String statName = (String) record.getValueByKey("stat"); + Object statValue = record.getValueByKey("value"); + + if ("median".equals(statName) && statValue instanceof Number) { + stats.put("median", ((Number) statValue).doubleValue()); + break; + } + } + } + } + + private String buildFieldFilter(String bucket, String measurement, String specifiedField) { + try { + if (specifiedField != null && !specifiedField.isEmpty()) { + return String.format("r._field == \"%s\"", specifiedField); + } + + String discoveryFlux = String.format( + """ + import "influxdata/influxdb/schema" + schema.fieldKeys(bucket: "%s", predicate: (r) => r._measurement == "%s", start: 1970-01-01T00:00:00Z) + """, + bucket, measurement); + + List fieldTables = queryApi.query(discoveryFlux); + List availableFields = new ArrayList<>(); + + for (FluxTable table : fieldTables) { + for (FluxRecord record : table.getRecords()) { + Object fieldName = record.getValueByKey("_value"); + if (fieldName != null) { + availableFields.add(fieldName.toString()); + } + } + } + + if (availableFields.contains("value")) { + return "r._field == \"value\""; + } else if (availableFields.contains("field")) { + return "r._field == \"field\""; + } else if (!availableFields.isEmpty()) { + String firstField = availableFields.get(0); + return String.format("r._field == \"%s\"", firstField); + } + + return "true"; + + } catch (Exception e) { + logger.log(Level.WARNING, "Error discovering fields for measurement " + measurement, e); + return "r._field == \"value\" or r._field == \"field\""; + } + } + + @Override + public void cancel() { + for (ValueIterator it : iterators.keySet().toArray(new ValueIterator[0])) { + try { + it.close(); + } catch (IOException e) { + logger.log(Level.WARNING, "Error closing iterator", e); + } + } + } + + @Override + public void close() { + cancel(); + if (influxClient != null) { + influxClient.close(); + logger.log(Level.INFO, "InfluxDB client closed"); + } + } + + @Override + public void finished(Object source) { + if (source instanceof ValueIterator) { + iterators.remove(source); + } + } +} \ No newline at end of file diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReaderFactory.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReaderFactory.java new file mode 100644 index 0000000000..6afbcf0d78 --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxArchiveReaderFactory.java @@ -0,0 +1,31 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +import org.phoebus.archive.reader.ArchiveReader; +import org.phoebus.archive.reader.spi.ArchiveReaderFactory; + +/** + * SPI Factory for "influx:" archive URLs. + * This factory is discovered via Java's Service Provider Interface (SPI) and is responsible + * for creating the InfluxArchiveReader when a DataBrowser URL with the "influx:" scheme is used. + */ +public class InfluxArchiveReaderFactory implements ArchiveReaderFactory { + /** Unique prefix that identifies InfluxDB data source URLs */ + public static final String PREFIX = "influx2:"; + + @Override + public String getPrefix() { + return PREFIX; + } + + @Override + public ArchiveReader createReader(final String url) throws Exception { + return new InfluxArchiveReader(url); + } +} diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxDecimatedValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxDecimatedValueIterator.java new file mode 100644 index 0000000000..0799e9ff3a --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxDecimatedValueIterator.java @@ -0,0 +1,339 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +import static org.phoebus.archive.reader.ArchiveReaders.logger; + +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import java.time.Duration; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.logging.Level; +import org.epics.vtype.Alarm; +import org.epics.vtype.AlarmSeverity; +import org.epics.vtype.AlarmStatus; +import org.epics.vtype.Display; +import org.epics.vtype.VNumber; +import org.epics.vtype.VType; +import org.phoebus.archive.reader.ValueIterator; +import org.phoebus.pv.TimeHelper; + +/** + * Value iterator for decimated numeric data from InfluxDB. + * Uses InfluxDB's native aggregateWindow function to decimate data on the server side. + * This approach is much more efficient for large datasets. + */ +public class InfluxDecimatedValueIterator implements ValueIterator { + private final InfluxArchiveReader reader; + private final String name; + private final String field; + private final Instant start; + private final Instant end; + private final int maxPoints; + private final IteratorListener listener; + private final Iterator valueIterator; + private boolean closed = false; + + private static final double MIN_EPSILON = 0.001; + private static final double MAX_EPSILON = 100.0; + private static final double BASE_EPSILON = 0.1; + private static final long ZOOM_THRESHOLD_SECONDS = 3600; + private static final double EPSILON_SCALE_FACTOR = 2.0; + + public InfluxDecimatedValueIterator(InfluxArchiveReader reader, String name, String field, + Instant start, Instant end, int maxPoints, + IteratorListener listener) { + this.reader = reader; + this.name = name; + this.field = field; + this.start = start; + this.end = end; + this.maxPoints = maxPoints; + this.listener = listener; + + List records = fetchDecimatedRecords(); + List values = convertRecordsToVTypes(records); + + this.valueIterator = values.iterator(); + } + + private List fetchDecimatedRecords() { + String startISO = DateTimeFormatter.ISO_INSTANT.format(start); + String endISO = DateTimeFormatter.ISO_INSTANT.format(end); + String bucket = InfluxPreferences.bucket; + + double epsilon = calculateDynamicEpsilon(); + + String flux = buildFluxQuery(bucket, startISO, endISO, name, field, epsilon); + + try { + List tables = reader.queryApi.query(flux); + + if (tables == null || tables.isEmpty()) { + logger.log(Level.INFO, String.format("No data returned from RDP query for %s with epsilon %.6f", name, epsilon)); + return new ArrayList<>(); + } + + List allRecords = new ArrayList<>(); + for (FluxTable table : tables) { + allRecords.addAll(table.getRecords()); + } + + allRecords.sort(Comparator.comparing(FluxRecord::getTime)); + + logger.log(Level.FINE, String.format("RDP decimation for %s: %d points with epsilon %.6f (time range: %s)", + name, allRecords.size(), epsilon, Duration.between(start, end))); + + return allRecords; + + } catch (Exception e) { + logger.log(Level.SEVERE, "Error querying InfluxDB for PV " + name, e); + return new ArrayList<>(); + } + } + + /** + * Calculates dynamic epsilon based on zoom level, data density, and value range + */ + private double calculateDynamicEpsilon() { + Duration timeRange = Duration.between(start, end); + long timeRangeSeconds = timeRange.getSeconds(); + + double zoomFactor = calculateZoomFactor(timeRangeSeconds); + double baseEpsilon = BASE_EPSILON * zoomFactor; + + double densityFactor = calculateDensityFactor(timeRangeSeconds); + double densityAdjustedEpsilon = baseEpsilon * densityFactor; + + double rangeAdjustedEpsilon = adjustEpsilonForDataRange(densityAdjustedEpsilon); + + double finalEpsilon = Math.max(MIN_EPSILON, Math.min(MAX_EPSILON, rangeAdjustedEpsilon)); + + logger.log(Level.FINE, String.format( + "Dynamic epsilon calculation for %s: timeRange=%ds, zoomFactor=%.3f, densityFactor=%.3f, finalEpsilon=%.6f", + name, timeRangeSeconds, zoomFactor, densityFactor, finalEpsilon)); + + return finalEpsilon; + } + + /** + * Calculates zoom factor based on time range + */ + private double calculateZoomFactor(long timeRangeSeconds) { + if (timeRangeSeconds <= 60) { + return 0.01; + } else if (timeRangeSeconds <= 300) { + return 0.05; + } else if (timeRangeSeconds <= 900) { + return 0.1; + } else if (timeRangeSeconds <= ZOOM_THRESHOLD_SECONDS) { + double ratio = (double) timeRangeSeconds / ZOOM_THRESHOLD_SECONDS; + return 0.1 + (0.9 * ratio); + } else if (timeRangeSeconds <= 86400) { + double ratio = (double) timeRangeSeconds / 86400.0; + return 1.0 + (ratio * EPSILON_SCALE_FACTOR); + } else if (timeRangeSeconds <= 604800) { + double ratio = (double) timeRangeSeconds / 604800.0; + return 3.0 + (ratio * EPSILON_SCALE_FACTOR * 2); + } else { + return Math.min(10.0, 7.0 + Math.log10(timeRangeSeconds / 604800.0)); + } + } + + /** + * Calculates density factor based on expected data points vs desired points + */ + private double calculateDensityFactor(long timeRangeSeconds) { + if (timeRangeSeconds <= maxPoints) { + return 0.5; + } else { + double ratio = (double) timeRangeSeconds / maxPoints; + return Math.min(5.0, Math.log(ratio) + 1.0); + } + } + + /** + * Adjusts epsilon based on data value range characteristics + * This method queries a sample of the data to understand value distribution + */ + private double adjustEpsilonForDataRange(double baseEpsilon) { + try { + String bucket = InfluxPreferences.bucket; + String startISO = DateTimeFormatter.ISO_INSTANT.format(start); + String endISO = DateTimeFormatter.ISO_INSTANT.format(end); + + String fieldFilter = field != null ? + String.format(" and r._field == \"%s\"", field) : ""; + + String rangeQuery = String.format( + """ + import "types" + + data = from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s"%s) + |> filter(fn: (r) => types.isType(v: r._value, type: "float") or types.isType(v: r._value, type: "int")) + |> sample(n: 1000) + + min_val = data |> min() |> map(fn: (r) => ({stat: "min", value: r._value})) + max_val = data |> max() |> map(fn: (r) => ({stat: "max", value: r._value})) + mean_val = data |> mean() |> map(fn: (r) => ({stat: "mean", value: r._value})) + stddev_val = data |> stddev() |> map(fn: (r) => ({stat: "stddev", value: r._value})) + + union(tables: [min_val, max_val, mean_val, stddev_val]) + """, + bucket, startISO, endISO, name, fieldFilter); + + List rangeTables = reader.queryApi.query(rangeQuery); + + double minValue = Double.NaN; + double maxValue = Double.NaN; + double meanValue = Double.NaN; + double stddevValue = Double.NaN; + + for (FluxTable table : rangeTables) { + for (FluxRecord record : table.getRecords()) { + String statName = (String) record.getValueByKey("stat"); + Object statValue = record.getValueByKey("value"); + + if (statValue instanceof Number number) { + switch (Objects.requireNonNull(statName)) { + case "min" -> minValue = number.doubleValue(); + case "max" -> maxValue = number.doubleValue(); + case "mean" -> meanValue = number.doubleValue(); + case "stddev" -> stddevValue = number.doubleValue(); + } + } + } + } + + if (!Double.isNaN(minValue) && !Double.isNaN(maxValue)) { + double valueRange = Math.abs(maxValue - minValue); + + if (valueRange > 0) { + double rangeBasedEpsilon = valueRange * 0.001; + + if (!Double.isNaN(stddevValue) && stddevValue > 0) { + double noiseBasedEpsilon = stddevValue * 0.1; + rangeBasedEpsilon = Math.min(rangeBasedEpsilon, noiseBasedEpsilon); + } + + double combinedEpsilon = Math.sqrt(baseEpsilon * rangeBasedEpsilon); + + logger.log(Level.FINE, String.format( + "Range-adjusted epsilon for %s: range=%.3f, stddev=%.3f, base=%.6f, range-based=%.6f, final=%.6f", + name, valueRange, stddevValue, baseEpsilon, rangeBasedEpsilon, combinedEpsilon)); + + return combinedEpsilon; + } + } + + } catch (Exception e) { + logger.log(Level.FINE, "Could not determine data range for epsilon adjustment: " + e.getMessage()); + } + + return baseEpsilon; + } + + /** + * Builds the Flux query with dynamic RDP epsilon + */ + private String buildFluxQuery(String bucket, String start, String stop, + String measurement, String field, double epsilon) { + + String fieldFilter = field != null ? + String.format(" and r._field == \"%s\"", field) : ""; + + String baseFilter = String.format( + "from(bucket: \"%s\") " + + "|> range(start: %s, stop: %s) " + + "|> filter(fn: (r) => r._measurement == \"%s\"%s)", + bucket, start, stop, measurement, fieldFilter); + + return buildRDPQuery(baseFilter, epsilon); + } + + /** + * Builds RDP query with specified epsilon + */ + private String buildRDPQuery(String baseFilter, double epsilon) { + return String.format( + "import \"experimental/polyline\" " + + "%s " + + "|> polyline.rdp(epsilon: %.6f)", + baseFilter, epsilon); + } + + /** + * Converts FluxRecords to VType objects + */ + private List convertRecordsToVTypes(List records) { + List values = new ArrayList<>(); + + for (FluxRecord record : records) { + VType value = createVType(record); + if (value != null) { + values.add(value); + } + } + + logger.log(Level.FINE, String.format("Converted %d FluxRecords to %d VTypes for %s", + records.size(), values.size(), name)); + + return values; + } + + /** + * Creates a VType from a FluxRecord + */ + private VType createVType(FluxRecord record) { + Object val = record.getValueByKey("_value"); + Instant timestamp = record.getTime(); + + if (timestamp == null) { + logger.log(Level.WARNING, "Record has null timestamp"); + return null; + } + + if (!(val instanceof Number)) { + logger.log(Level.FINE, "Non-numeric value: " + val); + return VNumber.of(0.0, + Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Invalid data"), + TimeHelper.fromInstant(timestamp), + Display.none()); + } + + return VNumber.of((Number) val, + Alarm.of(AlarmSeverity.NONE, AlarmStatus.NONE, "OK"), + TimeHelper.fromInstant(timestamp), + Display.none()); + } + + @Override + public boolean hasNext() { + return !closed && valueIterator.hasNext(); + } + + @Override + public VType next() { + if (!hasNext()) return null; + return valueIterator.next(); + } + + @Override + public void close() { + closed = true; + listener.finished(this); + } +} diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxPreferences.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxPreferences.java new file mode 100644 index 0000000000..b23cbf09eb --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxPreferences.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +import org.phoebus.framework.preferences.AnnotatedPreferences; +import org.phoebus.framework.preferences.Preference; + +public class InfluxPreferences { + @Preference static String ip; + @Preference static String port; + @Preference static String bucket; + @Preference static String org; + @Preference static String token; + @Preference static boolean useHttps; + static { + AnnotatedPreferences.initialize(InfluxPreferences.class, "/influx_preferences.properties"); + } +} \ No newline at end of file diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxValueIterator.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxValueIterator.java new file mode 100644 index 0000000000..f85e1fa310 --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/InfluxValueIterator.java @@ -0,0 +1,231 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +import static org.phoebus.archive.reader.ArchiveReaders.logger; + +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Level; +import org.epics.vtype.Alarm; +import org.epics.vtype.AlarmSeverity; +import org.epics.vtype.AlarmStatus; +import org.epics.vtype.Display; +import org.epics.vtype.VNumber; +import org.epics.vtype.VString; +import org.epics.vtype.VType; +import org.phoebus.archive.reader.ValueIterator; +import org.phoebus.pv.TimeHelper; + +/** + * Simplified iterator for InfluxDBv2 raw data. + */ +public class InfluxValueIterator implements ValueIterator { + private final InfluxArchiveReader reader; + private final String name; + private final String field; + private final Instant start; + private final Instant end; + private final IteratorListener listener; + private final Iterator valueIterator; + private final String meanWindow; + private boolean closed = false; + + public InfluxValueIterator(InfluxArchiveReader reader, String name, String field, + Instant start, Instant end, IteratorListener listener, + String meanWindow) throws Exception { + this.reader = reader; + this.name = name; + this.field = field; + this.start = start; + this.end = end; + this.listener = listener; + this.meanWindow = meanWindow; + + List values = fetchAndConvertRecords(); + this.valueIterator = values.iterator(); + } + + public InfluxValueIterator(InfluxArchiveReader reader, String name, String field, + Instant start, Instant end, IteratorListener listener) throws Exception { + this(reader, name, field, start, end, listener, null); + } + + private List fetchAndConvertRecords() throws Exception { + List records = fetchRecords(); + List values = new ArrayList<>(); + + for (FluxRecord record : records) { + VType value = createVType(record); + if (value != null) { + values.add(value); + } + } + + return values; + } + + private List fetchRecords() throws Exception { + String startISO = DateTimeFormatter.ISO_INSTANT.format(start); + String endISO = DateTimeFormatter.ISO_INSTANT.format(end); + String bucket = InfluxPreferences.bucket; + String flux = buildFluxQuery(bucket, startISO, endISO, name, field, meanWindow); + + try { + List tables = reader.queryApi.query(flux); + + if (tables == null || tables.isEmpty()) { + return new ArrayList<>(); + } + + List allRecords = new ArrayList<>(); + for (FluxTable table : tables) { + allRecords.addAll(table.getRecords()); + } + + allRecords.sort(Comparator.comparing(FluxRecord::getTime)); + + return allRecords; + } catch (Exception e) { + logger.log(Level.SEVERE, "Error querying InfluxDB for raw data: " + name, e); + throw new Exception("Failed to fetch raw data for " + name, e); + } + } + + private String buildFluxQuery(String bucket, String start, String stop, + String measurement, String field, String meanWindow) { + + String fieldFilter; + if (field != null && !field.isEmpty()) { + fieldFilter = String.format("r._field == \"%s\"", field); + } else { + fieldFilter = discoverFieldFilter(bucket, measurement); + } + + String baseQuery = String.format( + "from(bucket: \"%s\") " + + "|> range(start: %s, stop: %s) " + + "|> filter(fn: (r) => r._measurement == \"%s\" and %s)", + bucket, start, stop, measurement, fieldFilter); + + if (meanWindow != null && !meanWindow.trim().isEmpty()) { + baseQuery += String.format(" |> aggregateWindow(every: %s, fn: mean, createEmpty: false)", meanWindow); + } + + baseQuery += " |> sort(columns: [\"_time\"])"; + + return baseQuery; + } + + private String discoverFieldFilter(String bucket, String measurement) { + try { + String discoveryFlux = String.format( + """ + import "influxdata/influxdb/schema" + schema.fieldKeys(bucket: "%s", predicate: (r) => r._measurement == "%s", start: 1970-01-01T00:00:00Z) + """, + bucket, measurement); + + List fieldTables = reader.queryApi.query(discoveryFlux); + List availableFields = new ArrayList<>(); + + for (FluxTable table : fieldTables) { + for (FluxRecord record : table.getRecords()) { + Object fieldName = record.getValueByKey("_value"); + if (fieldName != null) { + availableFields.add(fieldName.toString()); + } + } + } + + if (availableFields.contains("value")) { + return "r._field == \"value\""; + } else if (availableFields.contains("field")) { + return "r._field == \"field\""; + } else if (!availableFields.isEmpty()) { + String firstField = availableFields.get(0); + return String.format("r._field == \"%s\"", firstField); + } + + return "true"; + + } catch (Exception e) { + logger.log(Level.WARNING, "Error discovering fields, using fallback", e); + return "r._field == \"value\" or r._field == \"field\""; + } + } + + public VType createVType(FluxRecord record) { + if (record == null) return null; + + Object val = record.getValueByKey("_value"); + Instant timestamp = record.getTime(); + + if (timestamp == null) { + logger.log(Level.WARNING, "Record has null timestamp for PV: " + name); + return null; + } + + if (val instanceof String) { + return VString.of(val.toString(), + Alarm.of(AlarmSeverity.NONE, AlarmStatus.NONE, "OK"), + TimeHelper.fromInstant(timestamp)); + } + + if (!(val instanceof Number)) { + if (val != null) { + logger.log(Level.FINE, String.format("Non-numeric value for PV %s: %s (%s)", + name, val, val.getClass().getSimpleName())); + } + return VNumber.of(0.0, + Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Invalid data"), + TimeHelper.fromInstant(timestamp), + Display.none()); + } + + double numVal = ((Number) val).doubleValue(); + + if (Double.isNaN(numVal) || Double.isInfinite(numVal)) { + return VNumber.of(0.0, + Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Invalid numeric value"), + TimeHelper.fromInstant(timestamp), + Display.none()); + } + + return VNumber.of(numVal, + Alarm.of(AlarmSeverity.NONE, AlarmStatus.NONE, "OK"), + TimeHelper.fromInstant(timestamp), + Display.none()); + } + + @Override + public synchronized boolean hasNext() { + return !closed && valueIterator.hasNext(); + } + + @Override + public synchronized VType next() { + if (!hasNext()) return null; + return valueIterator.next(); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + listener.finished(this); + logger.log(Level.FINE, "Closed iterator for PV: " + name); + } + } +} diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java new file mode 100644 index 0000000000..449174ed68 --- /dev/null +++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java @@ -0,0 +1,12 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.archive.reader.influx2; + +public interface IteratorListener { + void finished(Object source); +} diff --git a/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory b/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory index 737fe3cf43..d9f661c22b 100644 --- a/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory +++ b/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory @@ -2,3 +2,4 @@ org.phoebus.archive.reader.appliance.ApplianceArchiveReaderFactory org.phoebus.archive.reader.rdb.RDBArchiveReaderFactory org.phoebus.archive.reader.channelarchiver.XMLRPCArchiveReaderFactory org.phoebus.archive.reader.channelarchiver.file.ArchiveFileReaderFactory +org.phoebus.archive.reader.influx2.InfluxArchiveReaderFactory diff --git a/core/pom.xml b/core/pom.xml index 234f4a7a53..4df3403eaa 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -10,6 +10,7 @@ pva pv pv-ca + pv-influx2 pv-jackie pv-mqtt pv-opva diff --git a/core/pv-influx2/pom.xml b/core/pv-influx2/pom.xml new file mode 100644 index 0000000000..9b1114b70a --- /dev/null +++ b/core/pv-influx2/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + org.phoebus + parent + 5.0.3-SNAPSHOT + + + core-pv-influx2 + + + org.epics + epics-util + ${epics.util.version} + + + + org.epics + vtype + ${vtype.version} + + + + org.phoebus + core-pv + 5.0.3-SNAPSHOT + + + + org.phoebus + core-framework + 5.0.3-SNAPSHOT + + + + com.influxdb + influxdb-client-java + ${influxdb2Client.version} + + + org.testng + testng + RELEASE + test + + + org.junit.jupiter + junit-jupiter-api + 5.8.2 + test + + + org.mockito + mockito-core + 4.5.1 + test + + + org.mockito + mockito-junit-jupiter + 4.5.1 + test + + + org.mockito + mockito-inline + 5.2.0 + test + + + \ No newline at end of file diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java new file mode 100644 index 0000000000..4c2090e358 --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import java.util.logging.Level; +import java.util.logging.Logger; + + +/** + * Singleton context for managing the InfluxDB client instance. + *

+ * This class initializes and holds a single {@link InfluxDBClient} that can be + * accessed globally throughout the application. Connection settings such as host, + * port, token, and HTTPS usage are read from {@link InfluxDB_Preferences}. + * + */ +public class InfluxDB_Context { + /** Logger for logging initialization and configuration messages. */ + private static final Logger LOGGER = Logger.getLogger(InfluxDB_Context.class.getName()); + + /** The InfluxDB client used for querying or writing data. */ + private final InfluxDBClient client; + + /** + * Private constructor that creates an {@link InfluxDBClient} based on application preferences. + *

+ * Builds the client using host, port, token, and organization defined in the + * {@code pv_influx_preferences.properties} file. + */ + private InfluxDB_Context() { + InfluxDB_Preferences prefs = InfluxDB_Preferences.getInstance(); + String url = String.format("%s://%s:%d", + prefs.isUseHttps() ? "https" : "http", + prefs.getHost(), + prefs.getPort()); + + client = InfluxDBClientFactory.create(url, prefs.getToken().toCharArray(), prefs.getOrganization()); + LOGGER.log(Level.CONFIG, "InfluxDB client created with URL: {0}", url); + } + + private static class Holder { + private static final InfluxDB_Context INSTANCE = new InfluxDB_Context(); + } + + /** + * Returns the singleton instance of {@code InfluxDB_Context}. + * + * @return the singleton {@code InfluxDB_Context} instance + */ + public static InfluxDB_Context getInstance() { + return Holder.INSTANCE; + } + + /** + * Returns the underlying {@link InfluxDBClient} instance. + * + * @return the InfluxDB client + */ + public InfluxDBClient getClient() { + return client; + } +} diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java new file mode 100644 index 0000000000..6c644907c7 --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java @@ -0,0 +1,64 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import com.influxdb.query.FluxRecord; +import java.time.Instant; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.epics.vtype.Alarm; +import org.epics.vtype.Display; +import org.epics.vtype.Time; +import org.epics.vtype.VDouble; +import org.epics.vtype.VString; +import org.epics.vtype.VType; +import org.phoebus.pv.TimeHelper; + +/** + * Utility class for converting InfluxDB {@link FluxRecord} values to EPICS {@link VType} values. + *

+ * Supports conversion of numeric values to {@link VDouble} and string values to {@link VString}. + * Unhandled types will cause an exception to be thrown. + * + */ +public class InfluxDB_Helper { + /** Logger for the helper class */ + private static final Logger LOGGER = Logger.getLogger(InfluxDB_Helper.class.getName()); + + /** + * Converts a single {@link FluxRecord} from InfluxDB into a corresponding {@link VType}. + * + * @param record the InfluxDB record to convert + * @return a {@link VType} representing the value in the record (e.g., {@link VDouble} or {@link VString}) + * @throws Exception if the record is null or contains an unsupported value type + */ + public static VType convertRecordToVType(FluxRecord record) throws Exception { + if (record == null) { + throw new Exception("FluxRecord is null"); + } + + Object value = record.getValue(); + + // Get the timestamp, default to now if missing + Instant instant = record.getTime() != null ? record.getTime() : Instant.now(); + Time time = TimeHelper.fromInstant(instant); + Alarm alarm = Alarm.none(); + Display display = Display.none(); + + if (value instanceof Number) { + double d = ((Number) value).doubleValue(); + return VDouble.of(d, alarm, time, display); + } else if (value instanceof String) { + return VString.of((String) value, alarm, time); + } else { + assert value != null; + LOGGER.log(Level.WARNING, "Unsupported type: " + value.getClass().getName()); + throw new Exception("Cannot handle type " + value.getClass().getName()); + } + } +} diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java new file mode 100644 index 0000000000..f75228d722 --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java @@ -0,0 +1,257 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.QueryApi; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Level; +import org.epics.vtype.Alarm; +import org.epics.vtype.AlarmSeverity; +import org.epics.vtype.AlarmStatus; +import org.epics.vtype.Display; +import org.epics.vtype.Time; +import org.epics.vtype.VString; +import org.epics.vtype.VType; +import org.phoebus.pv.PV; + +/** + * Represents a read-only Process Variable (PV) backed by InfluxDB. + *

+ * This class uses the InfluxDBPollingManager to efficiently poll InfluxDB + * by batching requests with similar periods and the same bucket. + *

+ * Write operations are not supported. + */ +public class InfluxDB_PV extends PV { + /** The name of the InfluxDB bucket */ + private final String bucket; + + /** The name of the measurement to read from */ + private final String measurement; + + /** The field to read from (can be null for default) */ + private final String field; + + /** Period of sampling in milliseconds */ + private final long period; + + /** The InfluxDB client used for querying */ + private final InfluxDBClient influxClient; + + /** Timestamp of PV last update. */ + private Instant lastUpdate; + + /** Object */ + private Object lastValue; + + /** + * Constructs an {@code InfluxDB_PV} with the specified name and base_name. + * The base_name is expected to be in one of the following forms: + * - {@code influx2://bucket/measurement} - specific bucket, no specific field + * - {@code influx2://bucket/measurement/field} - specific bucket, specific field + *

+ * The bucket must always be specified explicitly. + * + * @param name the name of the PV + * @param base_name the base URI specifying the InfluxDB bucket, measurement, optional field + * @throws IllegalArgumentException if the format is invalid or bucket is missing + */ + public InfluxDB_PV(String name, String base_name) { + super(name); + + String new_base_name = base_name.replace("influx2://", ""); + String[] parts = new_base_name.split("/"); + + influxClient = InfluxDB_Context.getInstance().getClient(); + lastUpdate = Instant.now(); + + switch (parts.length) { + case 2: + this.bucket = parts[0]; + this.measurement = parts[1]; + this.field = null; + break; + case 3: + this.bucket = parts[0]; + this.measurement = parts[1]; + this.field = Objects.equals(parts[2], "null") ? null : parts[2]; + break; + default: + throw new IllegalArgumentException( + "Invalid InfluxDB PV format: " + base_name + + ". Expected format: influx2://bucket/measurement or influx2://bucket/measurement/field" + ); + } + if (this.bucket == null || this.bucket.trim().isEmpty()) { + throw new IllegalArgumentException( + "Bucket must be specified in InfluxDB PV: " + base_name + ); + } + + InfluxDB_Preferences prefs = InfluxDB_Preferences.getInstance(); + period = prefs.getRefreshPeriod(); + + logger.log(Level.INFO, "Creating InfluxDB PV with bucket: {0}, measurement: {1}, field: {2}, period: {3}ms", + new Object[]{bucket, measurement, field != null ? field : "all fields", period}); + + InfluxDB_PollingManager.getInstance().registerPV(this); + } + + /** + * Get the bucket for this PV + * @return the bucket name + */ + public String getBucket() { + return bucket; + } + + /** + * Get the measurement for this PV + * @return the measurement name + */ + public String getMeasurement() { + return measurement; + } + + /** + * Get the field for this PV + * @return the field name or null if no specific field + */ + public String getField() { + return field; + } + + /** + * Get the polling period for this PV + * @return the period in milliseconds + */ + public long getPeriod() { + return period; + } + + protected void updateValue(VType value, Object plainValue) { + if (value != null && plainValue != null) { + lastUpdate = Instant.now(); + lastValue = plainValue; + } + + notifyListenersOfValue(value); + } + + /** + * Update the PV to a disconnected state with an alarm + */ + protected void updateToDisconnectedState() { + VType disconnected; + + if (lastValue == null) { + disconnected = VString.of( + "Disconnected", + Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Disconnected"), + Time.of(lastUpdate != null ? lastUpdate : Instant.now()) + ); + } else { + VType converted = VType.toVType(lastValue, Alarm.noValue(), Time.of(lastUpdate), Display.none()); + + disconnected = Objects.requireNonNullElseGet(converted, () -> VString.of( + "Disconnected: " + lastValue.toString(), + Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Disconnected"), + Time.of(lastUpdate) + )); + } + + notifyListenersOfValue(disconnected); + } + + /** + * Asynchronously queries InfluxDB for the most recent value of the configured measurement. + *

+ * This method can be used for on-demand reading, but normal updates come through + * the PollingManager. + * + * @return a future that completes with the latest {@link VType} value + */ + @Override + public CompletableFuture asyncRead() { + CompletableFuture future = new CompletableFuture<>(); + + String flux; + if (field != null) { + flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " + + "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"%s\") |> last()", + bucket, measurement, field); + } else { + flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " + + "|> filter(fn: (r) => r._measurement == \"%s\") |> last()", + bucket, measurement); + } + + QueryApi queryApi = influxClient.getQueryApi(); + + try { + List tables = queryApi.query(flux); + + if (tables.isEmpty()) { + future.completeExceptionally(new Exception("No data returned from InfluxDB")); + } else { + FluxTable table = tables.get(0); + if (table.getRecords().isEmpty()) { + future.completeExceptionally(new Exception("No records in the result from InfluxDB")); + } else { + FluxRecord record = table.getRecords().get(0); + VType value = InfluxDB_Helper.convertRecordToVType(record); + notifyListenersOfValue(value); + future.complete(value); + } + } + } catch (Exception e) { + logger.log(Level.WARNING, "Query error for PV: " + getName(), e); + future.completeExceptionally(e); + } + + return future; + } + + /** + * Write operation is not supported for InfluxDB PVs. + * + * @param new_value the value to write (ignored) + * @throws UnsupportedOperationException always + */ + @Override + public void write(Object new_value) { + throw new UnsupportedOperationException("Write not supported for InfluxDB PV"); + } + + /** + * Asynchronous write is not supported for InfluxDB PVs. + * + * @param new_value the value to write (ignored) + * @return nothing, always throws exception + * @throws UnsupportedOperationException always + */ + @Override + public CompletableFuture asyncWrite(Object new_value) { + throw new UnsupportedOperationException("Async write not supported for InfluxDB PV"); + } + + /** + * Unregisters this PV from the polling manager. + * Called when the PV is being closed. + */ + @Override + protected void close() { + InfluxDB_PollingManager.getInstance().unregisterPV(this); + } +} diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java new file mode 100644 index 0000000000..606490a98d --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import org.phoebus.pv.PV; +import org.phoebus.pv.PVFactory; + +/** + * Factory class for creating {@link InfluxDB_PV} instances. + *

+ * This factory integrates with the Phoebus PV framework and registers the + * "influx" PV type so it can be discovered and instantiated dynamically + * when a PV URI starts with {@code influx2://}. + * + */ +public class InfluxDB_PVFactory implements PVFactory { + /** The PV type identifier used to register this factory. */ + public static final String TYPE = "influx2"; + + /** + * Returns the PV type string handled by this factory. + * This value is used to match PVs that should be created by this factory. + * + * @return the PV type string ("influx") + */ + @Override + public String getType() { return TYPE; } + + /** + * Creates a new {@link InfluxDB_PV} instance with the given name and base_name. + * + * @param name the full PV name + * @param base_name the base URI used to extract the InfluxDB bucket and measurement + * @return a new {@code InfluxDB_PV} instance + */ + @Override + public PV createPV(String name, String base_name) { return new InfluxDB_PV(name, base_name); } +} diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java new file mode 100644 index 0000000000..07d347faab --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java @@ -0,0 +1,377 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.ArrayList; +import java.util.Objects; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import java.util.stream.Collectors; + +import org.epics.vtype.VType; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.QueryApi; + +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; + +/** + * A manager class that optimizes InfluxDB polling by batching PV requests. + *

+ * This class groups PVs with similar polling periods and same buckets to reduce + * the total number of InfluxDB queries required. It also handles dynamic bucket + * changes for PVs that track the "last" bucket. + */ +public class InfluxDB_PollingManager { + /** Logger for this class */ + private static final Logger LOGGER = Logger.getLogger(InfluxDB_PollingManager.class.getName()); + + /** The InfluxDB client used for querying */ + private final InfluxDBClient influxClient; + + /** Shared executor for all polling tasks */ + private final ScheduledExecutorService scheduler; + + /** Maps bucket+period to its polling group */ + private final Map pollingGroups = new ConcurrentHashMap<>(); + + /** Maps a registered PV to its polling group key */ + private final Map pvToGroupKey = new ConcurrentHashMap<>(); + + /** Maximum number of fields to include in a single query */ + private static final int MAX_FIELDS_PER_QUERY = 100; + + /** + * Class representing a group of PVs that are polled together + * with the same bucket and polling period + */ + private class PollingGroup { + final String bucket; + final long periodMs; + final Set pvSet = Collections.synchronizedSet(new HashSet<>()); + ScheduledFuture scheduledTask; + + PollingGroup(String bucket, long periodMs) { + this.bucket = bucket; + this.periodMs = periodMs; + } + + void schedule() { + scheduledTask = scheduler.scheduleAtFixedRate( + this::pollGroup, + 0, + periodMs, + TimeUnit.MILLISECONDS + ); + } + + void pollGroup() { + if (pvSet.isEmpty()) { + return; + } + + Map> measurementGroups = new HashMap<>(); + + synchronized (pvSet) { + for (InfluxDB_PV pv : pvSet) { + if (Objects.equals(pv.getBucket(), this.bucket)) { + measurementGroups + .computeIfAbsent(pv.getMeasurement(), k -> new ArrayList<>()) + .add(pv); + } + } + } + + for (Map.Entry> entry : measurementGroups.entrySet()) { + String measurement = entry.getKey(); + List pvs = entry.getValue(); + + if (!pvs.isEmpty()) { + executeOptimizedQuery(bucket, measurement, pvs); + } + } + } + + void executeOptimizedQuery(String bucket, String measurement, List pvs) { + try { + Set uniqueFields = pvs.stream() + .map(InfluxDB_PV::getField) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + if (uniqueFields.isEmpty()) { + String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " + + "|> filter(fn: (r) => r._measurement == \"%s\") |> last()", + bucket, measurement); + + executeQueryAndDistributeResults(flux, measurement, pvs); + return; + } + + List fieldsList = new ArrayList<>(uniqueFields); + Map processedPVs = new HashMap<>(); + + for (int i = 0; i < fieldsList.size(); i += MAX_FIELDS_PER_QUERY) { + int end = Math.min(i + MAX_FIELDS_PER_QUERY, fieldsList.size()); + List batchFields = fieldsList.subList(i, end); + + String fieldFilter = batchFields.stream() + .map(field -> "r._field == \"" + field + "\"") + .collect(Collectors.joining(" or ")); + + String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " + + "|> filter(fn: (r) => r._measurement == \"%s\" and (%s)) |> last()", + bucket, measurement, fieldFilter); + + List batchPVs = pvs.stream() + .filter(pv -> { + String field = pv.getField(); + return field == null || batchFields.contains(field); + }) + .filter(pv -> !processedPVs.getOrDefault(pv, false)) + .collect(Collectors.toList()); + + if (!batchPVs.isEmpty()) { + executeQueryAndDistributeResults(flux, measurement, batchPVs); + + batchPVs.forEach(pv -> processedPVs.put(pv, true)); + } + } + + List unprocessedPVs = pvs.stream() + .filter(pv -> !processedPVs.getOrDefault(pv, false)) + .collect(Collectors.toList()); + + if (!unprocessedPVs.isEmpty()) { + String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " + + "|> filter(fn: (r) => r._measurement == \"%s\") |> last()", + bucket, measurement); + + executeQueryAndDistributeResults(flux, measurement, unprocessedPVs); + } + + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Error executing batch query", e); + } + } + + private void executeQueryAndDistributeResults(String flux, String measurement, List pvs) { + try { + QueryApi queryApi = influxClient.getQueryApi(); + List tables = queryApi.query(flux); + + if (!tables.isEmpty()) { + Map recordsByField = new HashMap<>(); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + String field = record.getField(); + recordsByField.put(field, record); + } + } + + for (InfluxDB_PV pv : pvs) { + String field = pv.getField(); + long disconnectTimeoutMs = InfluxDB_Preferences.getInstance().getDisconnectTimeout(); + + if (field != null) { + FluxRecord record = recordsByField.get(field); + + if (record != null) { + Duration timeSinceUpdate = Duration.between(Objects.requireNonNull(record.getTime()), Instant.now()); + + if (timeSinceUpdate.toMillis() > disconnectTimeoutMs) { + pv.updateToDisconnectedState(); + } else { + VType value = InfluxDB_Helper.convertRecordToVType(record); + if (value != null) { + pv.updateValue(value, record.getValue()); + } + } + } else { + pv.updateToDisconnectedState(); + } + } else { + List matchingRecords = tables.stream() + .flatMap(table -> table.getRecords().stream()) + .filter(record -> measurement.equals(record.getMeasurement())).toList(); + + if (!matchingRecords.isEmpty()) { + FluxRecord record = matchingRecords.get(0); + Duration timeSinceUpdate = Duration.between(Objects.requireNonNull(record.getTime()), Instant.now()); + + if (timeSinceUpdate.toMillis() > disconnectTimeoutMs) { + pv.updateToDisconnectedState(); + } else { + VType value = InfluxDB_Helper.convertRecordToVType(record); + if (value != null) { + pv.updateValue(value, record.getValue()); + } + } + } else { + pv.updateToDisconnectedState(); + } + } + } + } else { + for (InfluxDB_PV pv : pvs) { + pv.updateToDisconnectedState(); + } + } + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error executing batch query", e); + for (InfluxDB_PV pv : pvs) { + pv.updateToDisconnectedState(); + } + } + } + + void cancel() { + if (scheduledTask != null) { + scheduledTask.cancel(false); + scheduledTask = null; + } + } + } + + /** + * Private constructor to enforce singleton pattern + */ + private InfluxDB_PollingManager() { + influxClient = InfluxDB_Context.getInstance().getClient(); + scheduler = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), + r -> { + Thread thread = new Thread(r, "InfluxDB-PV-BatchPoller"); + thread.setDaemon(true); + return thread; + } + ); + } + + private static class Holder { + private static final InfluxDB_PollingManager INSTANCE = new InfluxDB_PollingManager(); + } + + /** + * Gets the singleton instance of the polling manager + * + * @return the polling manager instance + */ + public static InfluxDB_PollingManager getInstance() { + return Holder.INSTANCE; + } + + /** + * Registers a PV to be polled by the manager + * + * @param pv the InfluxDB PV to register + */ + public void registerPV(InfluxDB_PV pv) { + registerPVInternal(pv); + } + + /** + * Internal method to register a PV (used for both initial registration and reorganization) + * + * @param pv the InfluxDB PV to register + */ + private void registerPVInternal(InfluxDB_PV pv) { + String bucket = pv.getBucket(); + if (bucket == null) { + LOGGER.log(Level.WARNING, "Cannot register PV {0} - no bucket available", pv.getName()); + return; + } + + String groupKey = createGroupKey(bucket, pv.getPeriod()); + + pollingGroups.computeIfAbsent(groupKey, k -> { + String[] parts = k.split(":"); + String bucketName = parts[0]; + long period = Long.parseLong(parts[1]); + + PollingGroup group = new PollingGroup(bucketName, period); + group.schedule(); + return group; + }).pvSet.add(pv); + + pvToGroupKey.put(pv, groupKey); + } + + /** + * Unregisters a PV from the manager + * + * @param pv the InfluxDB PV to unregister + */ + public void unregisterPV(InfluxDB_PV pv) { + String groupKey = pvToGroupKey.remove(pv); + if (groupKey != null) { + PollingGroup group = pollingGroups.get(groupKey); + if (group != null) { + group.pvSet.remove(pv); + if (group.pvSet.isEmpty()) { + group.cancel(); + pollingGroups.remove(groupKey); + } + } + } + } + + /** + * Creates a group key for the given bucket and period + * This will group similar periods together to reduce the number of scheduled tasks + * + * @param bucket the InfluxDB bucket + * @param periodMs the polling period in milliseconds + * @return a key string for grouping + */ + private String createGroupKey(String bucket, long periodMs) { + long bucketedPeriod; + + if (periodMs <= 100) { + bucketedPeriod = periodMs; + } else if (periodMs <= 1000) { + bucketedPeriod = Math.round(periodMs / 100.0) * 100; + } else if (periodMs <= 10000) { + bucketedPeriod = Math.round(periodMs / 1000.0) * 1000; + } else { + bucketedPeriod = Math.round(periodMs / 5000.0) * 5000; + } + + return bucket + ":" + bucketedPeriod; + } + + /** + * Shuts down the polling manager and cancels all polling tasks + */ + public void shutdown() { + for (PollingGroup group : pollingGroups.values()) { + group.cancel(); + } + + pollingGroups.clear(); + pvToGroupKey.clear(); + scheduler.shutdownNow(); + } +} diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java new file mode 100644 index 0000000000..3c80f23d2c --- /dev/null +++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java @@ -0,0 +1,158 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import java.lang.reflect.Field; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.phoebus.framework.preferences.PreferencesReader; + +/** + * Singleton class responsible for loading and providing access to InfluxDB connection preferences. + *

+ * Preferences are read from a properties file named {@code pv_influx_preferences.properties} + * using the {@link PreferencesReader} utility. This class centralizes configuration details + * such as host, port, organization, authentication token, and HTTPS usage flag. + * + */ +public class InfluxDB_Preferences { + /** Logger used for logging information and errors related to InfluxDB preferences. */ + private static final Logger LOGGER = Logger.getLogger(InfluxDB_Preferences.class.getName()); + + private static final String HOST = "influx_host"; + private static final String PORT = "influx_port"; + private static final String ORGANIZATION = "influx_organization"; + private static final String TOKEN = "influx_token"; + private static final String USE_HTTPS = "influx_useHttps"; + private static final String DISCONNECT_TIMEOUT = "influx_disconnectTimeoutMs"; + + /** The singleton instance of this class. */ + private static final InfluxDB_Preferences INSTANCE = new InfluxDB_Preferences(); + + private static String host; + private static int port; + private String organization; + private String token; + private boolean useHttps; + private long refreshPeriod; + private long disconnectTimeout; + + /** + * Private constructor that loads preferences upon instantiation. + * This is part of the singleton pattern. + */ + private InfluxDB_Preferences() { + try { + installPreferences(); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error while loading InfluxDB preferences", e); + } + } + + /** + * Reads the InfluxDB preferences from the properties file. + * Uses the {@link PreferencesReader} to retrieve values. + */ + private void installPreferences() { + PreferencesReader prefs = new PreferencesReader(InfluxDB_Preferences.class, "/pv_influx_preferences.properties"); + + try { + Class prefsClass = Class.forName("org.csstudio.display.builder.runtime.Preferences"); + Field field = prefsClass.getField("update_throttle_ms"); + refreshPeriod = (Integer) field.get(null); + } catch (Exception e) { + refreshPeriod = 250; + } + + host = prefs.get(HOST); + port = prefs.getInt(PORT); + organization = prefs.get(ORGANIZATION); + token = prefs.get(TOKEN); + useHttps = prefs.getBoolean(USE_HTTPS); + disconnectTimeout = prefs.getLong(DISCONNECT_TIMEOUT); + + if (disconnectTimeout == 0) { + disconnectTimeout = 3000; + } + } + + /** + * Returns the singleton instance of {@code InfluxDB_Preferences}. + * + * @return the singleton instance + */ + public static InfluxDB_Preferences getInstance() { + return INSTANCE; + } + + /** + * Returns the configured InfluxDB host name or IP address. + * + * @return the InfluxDB host + */ + public static String getHost() { + return host; + } + + /** + * Returns the configured port number for connecting to InfluxDB. + * + * @return the InfluxDB port + */ + public static int getPort() { + return port; + } + + /** + * Returns the name of the InfluxDB organization. + * + * @return the InfluxDB organization + */ + public String getOrganization() { + return organization; + } + + /** + * Returns the authentication token used for accessing InfluxDB. + * + * @return the InfluxDB token + */ + public String getToken() { + return token; + } + + /** + * Indicates whether HTTPS should be used when connecting to InfluxDB. + * + * @return {@code true} if HTTPS is enabled; {@code false} otherwise + */ + public boolean isUseHttps() { + return useHttps; + } + + /** + * Retuns the refresh period of a PV in milliseconds. + * Default: 1000ms + * + * @return the refrehs period + */ + public long getRefreshPeriod() { + return refreshPeriod; + } + + /** + * Returns the timeout before a PV is considered as disconnected in milliseconds. + * Disconnected state is triggered when a PV isn't updated for a defined amount of time. + * Default: 3000 ms + * + * @return timeout disconnected + */ + public long getDisconnectTimeout() { + return disconnectTimeout; + } +} diff --git a/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory b/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory new file mode 100644 index 0000000000..75980746b3 --- /dev/null +++ b/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory @@ -0,0 +1 @@ +org.phoebus.pv.influx2.InfluxDB_PVFactory \ No newline at end of file diff --git a/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java b/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java new file mode 100644 index 0000000000..31d4fcda19 --- /dev/null +++ b/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java @@ -0,0 +1,614 @@ +/******************************************************************************* + * Copyright (C) 2025 Thales. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + *******************************************************************************/ +package org.phoebus.pv.influx2; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApiBlocking; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import org.epics.vtype.VDouble; +import org.epics.vtype.VString; +import org.epics.vtype.VType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.phoebus.pv.PV; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.testng.annotations.AfterClass; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mockStatic; + +/** + * Comprehensive test suite for InfluxDB Process Variable (PV) functionality. + * This test class validates the complete InfluxDB PV system including: + * - Basic PV creation and configuration with explicit bucket specification + * - Asynchronous read operations + * - Connection and error handling + * - Data type conversion between InfluxDB and EPICS VTypes + * - Performance testing with large numbers of PVs + * - Factory pattern implementation + * - Polling manager stress testing + * The tests use Mockito to simulate InfluxDB client interactions without + * requiring an actual InfluxDB instance, ensuring fast and reliable unit testing. + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class InfluxPVTest { + + @Mock + private InfluxDBClient mockInfluxClient; + + @Mock + private QueryApi mockQueryApi; + + @Mock + private WriteApiBlocking mockWriteApi; + + private MockedStatic mockContext; + private MockedStatic mockPreferences; + + private InfluxDB_Context mockContextInstance; + private InfluxDB_Preferences mockPreferencesInstance; + private InfluxDB_PollingManager pollingManager; + + private static final String TEST_BUCKET = "test_bucket"; + private static final String TEST_MEASUREMENT = "test_measurement"; + private static final String TEST_FIELD = "value"; + private static final long TEST_PERIOD = 100L; + + /** + * Sets up the test environment before each test method. + * Initializes all mock objects including InfluxDB client, APIs, context, + * and preferences. Configures default behaviors for common operations. + */ + @BeforeEach + void setUp() { + mockContextInstance = mock(InfluxDB_Context.class); + when(mockContextInstance.getClient()).thenReturn(mockInfluxClient); + mockContext = mockStatic(InfluxDB_Context.class); + mockContext.when(InfluxDB_Context::getInstance).thenReturn(mockContextInstance); + + mockPreferencesInstance = mock(InfluxDB_Preferences.class); + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(TEST_PERIOD); + when(mockPreferencesInstance.getDisconnectTimeout()).thenReturn(3000L); + when(mockPreferencesInstance.isUseHttps()).thenReturn(false); + mockPreferences = mockStatic(InfluxDB_Preferences.class); + mockPreferences.when(InfluxDB_Preferences::getInstance).thenReturn(mockPreferencesInstance); + + when(mockInfluxClient.getQueryApi()).thenReturn(mockQueryApi); + when(mockInfluxClient.getWriteApiBlocking()).thenReturn(mockWriteApi); + + pollingManager = InfluxDB_PollingManager.getInstance(); + } + + /** + * Cleans up mock static objects after each test method to prevent interference + * between tests. + */ + @AfterEach + void tearDownTest() { + if (mockContext != null) { + mockContext.close(); + } + if (mockPreferences != null) { + mockPreferences.close(); + } + } + + /** + * Performs final cleanup of the polling manager after all tests complete. + * Ensures proper shutdown of background threads and resources. + */ + @AfterClass + void tearDown() { + if (pollingManager != null) { + pollingManager.shutdown(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Tests basic InfluxDB PV creation with explicit bucket specification. + * Verifies that a PV can be created with a specific bucket, measurement, + * and field, and that all properties are correctly initialized. + */ + @Test + @Order(1) + @DisplayName("Test basic InfluxDB PV creation with bucket/measurement/field") + void testBasicPVCreationWithField() { + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD + ); + + assertNotNull(pv); + assertEquals(TEST_BUCKET, pv.getBucket()); + assertEquals(TEST_MEASUREMENT, pv.getMeasurement()); + assertEquals(TEST_FIELD, pv.getField()); + assertEquals(100L, pv.getPeriod()); + + pv.close(); + } + + /** + * Tests PV creation without specific field specification. + * Verifies that a PV can be created with only bucket and measurement, + * allowing the system to read all fields from the measurement. + */ + @Test + @Order(2) + @DisplayName("Test PV creation with bucket/measurement only") + void testPVCreationWithoutField() { + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + ); + + assertNotNull(pv); + assertEquals(TEST_BUCKET, pv.getBucket()); + assertEquals(TEST_MEASUREMENT, pv.getMeasurement()); + assertNull(pv.getField()); + assertEquals(100L, pv.getPeriod()); + + pv.close(); + } + + /** + * Tests that PV creation fails when bucket is not specified. + * The system should throw an IllegalArgumentException when attempting + * to create a PV without a bucket specification. + */ + @Test + @Order(3) + @DisplayName("Test PV creation fails without bucket") + void testPVCreationFailsWithoutBucket() { + assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV(TEST_MEASUREMENT, "influx2://" + TEST_MEASUREMENT)); + } + + /** + * Tests that PV creation fails with invalid format. + * The system should throw an IllegalArgumentException when the format + * doesn't match the expected patterns. + */ + @Test + @Order(4) + @DisplayName("Test PV creation fails with invalid format") + void testPVCreationFailsWithInvalidFormat() { + assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2://bucket/measurement/field/extra")); + assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2://measurement")); + } + + /** + * Tests asynchronous read operations from InfluxDB. + * Verifies that the PV can successfully execute asynchronous queries + * against InfluxDB and convert the results to appropriate VType objects. + */ + @Test + @Order(5) + @DisplayName("Test asynchronous read") + void testAsyncRead() throws Exception { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L); + + FluxRecord mockRecord = mock(FluxRecord.class); + when(mockRecord.getValue()).thenReturn(42.0); + when(mockRecord.getTime()).thenReturn(Instant.now()); + when(mockRecord.getField()).thenReturn(TEST_FIELD); + + FluxTable mockTable = mock(FluxTable.class); + when(mockTable.getRecords()).thenReturn(List.of(mockRecord)); + + when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable)); + + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD + ); + + CompletableFuture future = pv.asyncRead(); + VType result = future.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertInstanceOf(VDouble.class, result); + assertEquals(42.0, ((VDouble) result).getValue(), 0.001); + + pv.close(); + } + + /** + * Tests proper handling of connection errors during read operations. + * When the InfluxDB connection fails, the PV should propagate the error + * appropriately through the CompletableFuture mechanism. + */ + @Test + @Order(6) + @DisplayName("Test connection error handling") + void testConnectionErrorHandling() { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L); + + when(mockQueryApi.query(anyString())).thenThrow(new RuntimeException("Connection failed")); + + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD + ); + + CompletableFuture future = pv.asyncRead(); + assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)); + + pv.close(); + } + + /** + * Tests PV disconnection scenarios. + * Verifies that PVs can handle disconnection events gracefully, + * typically triggered by stale data or connection timeouts. + */ + @Test + @Order(7) + @DisplayName("Test disconnection handling") + void testDisconnectionHandling() throws InterruptedException { + FluxRecord oldRecord = mock(FluxRecord.class); + when(oldRecord.getValue()).thenReturn(42.0); + when(oldRecord.getTime()).thenReturn(Instant.now().minus(10, ChronoUnit.SECONDS)); + when(oldRecord.getField()).thenReturn(TEST_FIELD); + + FluxTable mockTable = mock(FluxTable.class); + when(mockTable.getRecords()).thenReturn(List.of(oldRecord)); + + when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable)); + + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD + ); + + Thread.sleep(500); + + pv.close(); + } + + /** + * Tests data type conversion from InfluxDB records to EPICS VTypes. + * Verifies that the helper utilities can correctly convert different + * data types from InfluxDB FluxRecords to their corresponding VType representations. + */ + @Test + @Order(8) + @DisplayName("Test data type conversion") + void testDataTypeConversion() throws Exception { + FluxRecord doubleRecord = mock(FluxRecord.class); + when(doubleRecord.getValue()).thenReturn(3.14159); + when(doubleRecord.getTime()).thenReturn(Instant.now()); + + VType doubleResult = InfluxDB_Helper.convertRecordToVType(doubleRecord); + assertInstanceOf(VDouble.class, doubleResult); + assertEquals(3.14159, ((VDouble) doubleResult).getValue(), 0.00001); + + FluxRecord stringRecord = mock(FluxRecord.class); + when(stringRecord.getValue()).thenReturn("test-string"); + when(stringRecord.getTime()).thenReturn(Instant.now()); + + VType stringResult = InfluxDB_Helper.convertRecordToVType(stringRecord); + assertInstanceOf(VString.class, stringResult); + assertEquals("test-string", ((VString) stringResult).getValue()); + + FluxRecord intRecord = mock(FluxRecord.class); + when(intRecord.getValue()).thenReturn(42); + when(intRecord.getTime()).thenReturn(Instant.now()); + + VType intResult = InfluxDB_Helper.convertRecordToVType(intRecord); + assertInstanceOf(VDouble.class, intResult); + assertEquals(42.0, ((VDouble) intResult).getValue(), 0.001); + } + + /** + * Tests PV creation without specifying a specific field name. + * When no field is specified, the PV should be able to read from + * the measurement using default field resolution. + */ + @Test + @Order(9) + @DisplayName("Test PV without specific field") + void testPVWithoutSpecificField() throws Exception { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L); + + FluxRecord mockRecord = mock(FluxRecord.class); + when(mockRecord.getValue()).thenReturn(99.9); + when(mockRecord.getTime()).thenReturn(Instant.now()); + when(mockRecord.getMeasurement()).thenReturn(TEST_MEASUREMENT); + + FluxTable mockTable = mock(FluxTable.class); + when(mockTable.getRecords()).thenReturn(List.of(mockRecord)); + + when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable)); + + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + ); + + CompletableFuture future = pv.asyncRead(); + VType result = future.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertNull(pv.getField()); + assertInstanceOf(VDouble.class, result); + + pv.close(); + } + + /** + * Performance test with 10,000 PVs to validate system scalability. + * This test creates a large number of PVs to validate that the system + * can handle high-volume scenarios. + */ + @Test + @Order(10) + @DisplayName("Performance test with 10000 PVs") + @Timeout(value = 60, unit = TimeUnit.SECONDS) + void testPerformanceWith10000PVs() throws Exception { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(5000L); + + final int PV_COUNT = 10000; + final int BUCKET_COUNT = 10; + + FluxRecord mockRecord = mock(FluxRecord.class); + when(mockRecord.getValue()).thenReturn(42.0); + when(mockRecord.getTime()).thenReturn(Instant.now()); + when(mockRecord.getField()).thenReturn(""); + + FluxTable mockTable = mock(FluxTable.class); + when(mockTable.getRecords()).thenReturn(List.of(mockRecord)); + when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable)); + + AtomicLong creationTime = new AtomicLong(0); + AtomicInteger successfulCreations = new AtomicInteger(0); + + List pvs = new ArrayList<>(PV_COUNT); + + long startTime = System.nanoTime(); + + for (int i = 0; i < PV_COUNT; i++) { + long pvStartTime = System.nanoTime(); + + InfluxDB_PV pv = mock(InfluxDB_PV.class); + when(pv.getBucket()).thenReturn("bucket-" + (i % BUCKET_COUNT)); + when(pv.getMeasurement()).thenReturn("measurement-" + i); + + when(pv.asyncRead()).thenReturn(CompletableFuture.completedFuture(new VType() { + @Override + public String toString() { + return "mocked_value"; + } + })); + + pvs.add(pv); + + long pvEndTime = System.nanoTime(); + creationTime.addAndGet(pvEndTime - pvStartTime); + successfulCreations.incrementAndGet(); + } + + long endTime = System.nanoTime(); + long totalCreationTime = endTime - startTime; + + Thread.sleep(1000); + + AtomicInteger successfulReads = new AtomicInteger(0); + AtomicLong readTime = new AtomicLong(0); + + for (InfluxDB_PV pv : pvs) { + try { + long readStart = System.nanoTime(); + CompletableFuture future = pv.asyncRead(); + VType result = future.get(5, TimeUnit.SECONDS); + long readEnd = System.nanoTime(); + + if (result != null) { + successfulReads.incrementAndGet(); + readTime.addAndGet(readEnd - readStart); + } + } catch (Exception e) { + System.err.println("Read failed: " + e.getMessage()); + } + } + + System.out.println("\n=== Performance Test Results ==="); + System.out.println("Total PVs created: " + successfulCreations.get() + "/" + PV_COUNT); + System.out.println("Total creation time: " + (totalCreationTime / 1_000_000) + " ms"); + System.out.println("Average creation time per PV: " + (creationTime.get() / PV_COUNT / 1_000_000) + " ms"); + System.out.println("Successful reads: " + successfulReads.get() + "/" + PV_COUNT); + if (successfulReads.get() > 0) { + System.out.println("Average read time: " + (readTime.get() / successfulReads.get() / 1_000_000) + " ms"); + } + + assertTrue(successfulCreations.get() >= PV_COUNT * 0.95, + "Should create at least 95% of PVs successfully"); + assertTrue(totalCreationTime < 30_000_000_000L, + "Total creation time should be less than 30 seconds"); + assertTrue(successfulReads.get() >= 80, + "Should complete at least 80% of read tests successfully"); + + System.gc(); + Thread.sleep(100); + long finalMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.println("Final memory usage: " + finalMemory / 1024 / 1024 + " MB"); + + long cleanupStart = System.nanoTime(); + for (InfluxDB_PV pv : pvs) { + try { + pv.close(); + } catch (Exception ignore) { + } + } + long cleanupEnd = System.nanoTime(); + System.out.println("Cleanup time: " + ((cleanupEnd - cleanupStart) / 1_000_000) + " ms"); + System.out.println("=== End Performance Test ===\n"); + } + + /** + * Tests that unsupported operations throw appropriate exceptions. + * InfluxDB PVs are read-only by design. + */ + @Test + @Order(11) + @DisplayName("Test unsupported operations") + void testUnsupportedOperations() { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L); + + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD + ); + + assertThrows(UnsupportedOperationException.class, () -> pv.write(42.0)); + assertThrows(UnsupportedOperationException.class, () -> pv.asyncWrite(42.0)); + + pv.close(); + } + + /** + * Tests the InfluxDB PV factory implementation. + * Verifies that the factory correctly identifies itself with the "influx2" + * type and can create InfluxDB_PV instances when requested. + */ + @Test + @Order(12) + @DisplayName("Test InfluxDB PV Factory") + void testPVFactory() { + InfluxDB_PVFactory factory = new InfluxDB_PVFactory(); + + assertEquals("influx2", factory.getType()); + + PV pv = factory.createPV( + TEST_BUCKET + "/" + TEST_MEASUREMENT, + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + ); + assertNotNull(pv); + assertInstanceOf(InfluxDB_PV.class, pv); + } + + /** + * Stress tests the polling manager with a large number of PVs. + * This test creates 100,000 PVs to validate that the polling manager + * can handle extreme loads without crashing or causing memory issues. + */ + @Test + @Order(13) + @DisplayName("Test stress polling manager") + void testPollingManagerStress() throws InterruptedException { + final int STRESS_PV_COUNT = 100_000; + List stressPVs = Collections.synchronizedList(new ArrayList<>()); + + try { + when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(5000L); + + FluxRecord mockRecord = mock(FluxRecord.class); + when(mockRecord.getValue()).thenReturn(123.45); + when(mockRecord.getTime()).thenReturn(Instant.now()); + when(mockRecord.getField()).thenReturn("mockField"); + + FluxTable mockTable = mock(FluxTable.class); + when(mockTable.getRecords()).thenReturn(List.of(mockRecord)); + when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable)); + + for (int i = 0; i < STRESS_PV_COUNT; i++) { + String pvName = "stress_pv" + i; + String bucketName = "bucket_" + (i % 5); + String measurementName = "measurement_" + (i % 20); + String fieldName = "field_" + (i % 10); + + InfluxDB_PV pv = new InfluxDB_PV(pvName, + String.format("influx2://%s/%s/%s", bucketName, measurementName, fieldName)); + stressPVs.add(pv); + } + + Thread.sleep(2000); + + assertTrue(true, "Stress test completed without major issues"); + } finally { + for (InfluxDB_PV pv : stressPVs) { + try { + pv.close(); + } catch (Exception e) { + System.err.println("Error closing PV: " + e.getMessage()); + } + } + } + } + + /** + * Tests PV creation with null field (should be treated as no specific field). + * Verifies that passing "null" as field name results in no specific field selection. + */ + @Test + @Order(14) + @DisplayName("Test PV creation with 'null' field") + void testPVCreationWithNullField() { + InfluxDB_PV pv = new InfluxDB_PV( + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/null", + "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/null" + ); + + assertNotNull(pv); + assertEquals(TEST_BUCKET, pv.getBucket()); + assertEquals(TEST_MEASUREMENT, pv.getMeasurement()); + assertNull(pv.getField()); + + pv.close(); + } + + /** + * Tests that empty bucket name is rejected. + * The system should throw an IllegalArgumentException for empty bucket names. + */ + @Test + @Order(15) + @DisplayName("Test empty bucket name is rejected") + void testEmptyBucketRejected() { + assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2:///measurement")); + + assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2:// /measurement")); + } +} diff --git a/phoebus-product/pom.xml b/phoebus-product/pom.xml index e4747da801..2cc74f6a33 100644 --- a/phoebus-product/pom.xml +++ b/phoebus-product/pom.xml @@ -22,6 +22,11 @@ core-pv-mqtt 5.0.3-SNAPSHOT + + org.phoebus + core-pv-influx2 + 5.0.3-SNAPSHOT + org.phoebus core-pv-opva diff --git a/pom.xml b/pom.xml index 2aaff63f99..aff761a480 100644 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ 17 5.18.4 1.26.1 + 7.2.0 From 503833374341ba29bb0c878510890f36d1415752 Mon Sep 17 00:00:00 2001 From: flef Date: Fri, 12 Sep 2025 11:25:10 +0200 Subject: [PATCH 2/2] Update pom.xml fix: wrong parent for module pv-influx2 --- core/pv-influx2/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/pv-influx2/pom.xml b/core/pv-influx2/pom.xml index 9b1114b70a..f7badab50b 100644 --- a/core/pv-influx2/pom.xml +++ b/core/pv-influx2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 org.phoebus - parent + core 5.0.3-SNAPSHOT @@ -71,4 +71,4 @@ test - \ No newline at end of file +