availableFields = new ArrayList<>();
+
+ for (FluxTable table : fieldTables) {
+ for (FluxRecord record : table.getRecords()) {
+ Object fieldName = record.getValueByKey("_value");
+ if (fieldName != null) {
+ availableFields.add(fieldName.toString());
+ }
+ }
+ }
+
+ if (availableFields.contains("value")) {
+ return "r._field == \"value\"";
+ } else if (availableFields.contains("field")) {
+ return "r._field == \"field\"";
+ } else if (!availableFields.isEmpty()) {
+ String firstField = availableFields.get(0);
+ return String.format("r._field == \"%s\"", firstField);
+ }
+
+ return "true";
+
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Error discovering fields, using fallback", e);
+ return "r._field == \"value\" or r._field == \"field\"";
+ }
+ }
+
+ public VType createVType(FluxRecord record) {
+ if (record == null) return null;
+
+ Object val = record.getValueByKey("_value");
+ Instant timestamp = record.getTime();
+
+ if (timestamp == null) {
+ logger.log(Level.WARNING, "Record has null timestamp for PV: " + name);
+ return null;
+ }
+
+ if (val instanceof String) {
+ return VString.of(val.toString(),
+ Alarm.of(AlarmSeverity.NONE, AlarmStatus.NONE, "OK"),
+ TimeHelper.fromInstant(timestamp));
+ }
+
+ if (!(val instanceof Number)) {
+ if (val != null) {
+ logger.log(Level.FINE, String.format("Non-numeric value for PV %s: %s (%s)",
+ name, val, val.getClass().getSimpleName()));
+ }
+ return VNumber.of(0.0,
+ Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Invalid data"),
+ TimeHelper.fromInstant(timestamp),
+ Display.none());
+ }
+
+ double numVal = ((Number) val).doubleValue();
+
+ if (Double.isNaN(numVal) || Double.isInfinite(numVal)) {
+ return VNumber.of(0.0,
+ Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Invalid numeric value"),
+ TimeHelper.fromInstant(timestamp),
+ Display.none());
+ }
+
+ return VNumber.of(numVal,
+ Alarm.of(AlarmSeverity.NONE, AlarmStatus.NONE, "OK"),
+ TimeHelper.fromInstant(timestamp),
+ Display.none());
+ }
+
+ @Override
+ public synchronized boolean hasNext() {
+ return !closed && valueIterator.hasNext();
+ }
+
+ @Override
+ public synchronized VType next() {
+ if (!hasNext()) return null;
+ return valueIterator.next();
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+ listener.finished(this);
+ logger.log(Level.FINE, "Closed iterator for PV: " + name);
+ }
+ }
+}
diff --git a/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java
new file mode 100644
index 0000000000..449174ed68
--- /dev/null
+++ b/app/trends/archive-reader/src/main/java/org/phoebus/archive/reader/influx2/IteratorListener.java
@@ -0,0 +1,12 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.archive.reader.influx2;
+
+public interface IteratorListener {
+ void finished(Object source);
+}
diff --git a/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory b/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory
index 737fe3cf43..d9f661c22b 100644
--- a/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory
+++ b/app/trends/archive-reader/src/main/resources/META-INF/services/org.phoebus.archive.reader.spi.ArchiveReaderFactory
@@ -2,3 +2,4 @@ org.phoebus.archive.reader.appliance.ApplianceArchiveReaderFactory
org.phoebus.archive.reader.rdb.RDBArchiveReaderFactory
org.phoebus.archive.reader.channelarchiver.XMLRPCArchiveReaderFactory
org.phoebus.archive.reader.channelarchiver.file.ArchiveFileReaderFactory
+org.phoebus.archive.reader.influx2.InfluxArchiveReaderFactory
diff --git a/core/pom.xml b/core/pom.xml
index 234f4a7a53..4df3403eaa 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -10,6 +10,7 @@
pva
pv
pv-ca
+ pv-influx2
pv-jackie
pv-mqtt
pv-opva
diff --git a/core/pv-influx2/pom.xml b/core/pv-influx2/pom.xml
new file mode 100644
index 0000000000..f7badab50b
--- /dev/null
+++ b/core/pv-influx2/pom.xml
@@ -0,0 +1,74 @@
+
+
+ 4.0.0
+
+ org.phoebus
+ core
+ 5.0.3-SNAPSHOT
+
+
+ core-pv-influx2
+
+
+ org.epics
+ epics-util
+ ${epics.util.version}
+
+
+
+ org.epics
+ vtype
+ ${vtype.version}
+
+
+
+ org.phoebus
+ core-pv
+ 5.0.3-SNAPSHOT
+
+
+
+ org.phoebus
+ core-framework
+ 5.0.3-SNAPSHOT
+
+
+
+ com.influxdb
+ influxdb-client-java
+ ${influxdb2Client.version}
+
+
+ org.testng
+ testng
+ RELEASE
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.8.2
+ test
+
+
+ org.mockito
+ mockito-core
+ 4.5.1
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ 4.5.1
+ test
+
+
+ org.mockito
+ mockito-inline
+ 5.2.0
+ test
+
+
+
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java
new file mode 100644
index 0000000000..4c2090e358
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Context.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * Singleton context for managing the InfluxDB client instance.
+ *
+ * This class initializes and holds a single {@link InfluxDBClient} that can be
+ * accessed globally throughout the application. Connection settings such as host,
+ * port, token, and HTTPS usage are read from {@link InfluxDB_Preferences}.
+ *
+ */
+public class InfluxDB_Context {
+ /** Logger for logging initialization and configuration messages. */
+ private static final Logger LOGGER = Logger.getLogger(InfluxDB_Context.class.getName());
+
+ /** The InfluxDB client used for querying or writing data. */
+ private final InfluxDBClient client;
+
+ /**
+ * Private constructor that creates an {@link InfluxDBClient} based on application preferences.
+ *
+ * Builds the client using host, port, token, and organization defined in the
+ * {@code pv_influx_preferences.properties} file.
+ */
+ private InfluxDB_Context() {
+ InfluxDB_Preferences prefs = InfluxDB_Preferences.getInstance();
+ String url = String.format("%s://%s:%d",
+ prefs.isUseHttps() ? "https" : "http",
+ prefs.getHost(),
+ prefs.getPort());
+
+ client = InfluxDBClientFactory.create(url, prefs.getToken().toCharArray(), prefs.getOrganization());
+ LOGGER.log(Level.CONFIG, "InfluxDB client created with URL: {0}", url);
+ }
+
+ private static class Holder {
+ private static final InfluxDB_Context INSTANCE = new InfluxDB_Context();
+ }
+
+ /**
+ * Returns the singleton instance of {@code InfluxDB_Context}.
+ *
+ * @return the singleton {@code InfluxDB_Context} instance
+ */
+ public static InfluxDB_Context getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ /**
+ * Returns the underlying {@link InfluxDBClient} instance.
+ *
+ * @return the InfluxDB client
+ */
+ public InfluxDBClient getClient() {
+ return client;
+ }
+}
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java
new file mode 100644
index 0000000000..6c644907c7
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Helper.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import com.influxdb.query.FluxRecord;
+import java.time.Instant;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.epics.vtype.Alarm;
+import org.epics.vtype.Display;
+import org.epics.vtype.Time;
+import org.epics.vtype.VDouble;
+import org.epics.vtype.VString;
+import org.epics.vtype.VType;
+import org.phoebus.pv.TimeHelper;
+
+/**
+ * Utility class for converting InfluxDB {@link FluxRecord} values to EPICS {@link VType} values.
+ *
+ * Supports conversion of numeric values to {@link VDouble} and string values to {@link VString}.
+ * Unhandled types will cause an exception to be thrown.
+ *
+ */
+public class InfluxDB_Helper {
+ /** Logger for the helper class */
+ private static final Logger LOGGER = Logger.getLogger(InfluxDB_Helper.class.getName());
+
+ /**
+ * Converts a single {@link FluxRecord} from InfluxDB into a corresponding {@link VType}.
+ *
+ * @param record the InfluxDB record to convert
+ * @return a {@link VType} representing the value in the record (e.g., {@link VDouble} or {@link VString})
+ * @throws Exception if the record is null or contains an unsupported value type
+ */
+ public static VType convertRecordToVType(FluxRecord record) throws Exception {
+ if (record == null) {
+ throw new Exception("FluxRecord is null");
+ }
+
+ Object value = record.getValue();
+
+ // Get the timestamp, default to now if missing
+ Instant instant = record.getTime() != null ? record.getTime() : Instant.now();
+ Time time = TimeHelper.fromInstant(instant);
+ Alarm alarm = Alarm.none();
+ Display display = Display.none();
+
+ if (value instanceof Number) {
+ double d = ((Number) value).doubleValue();
+ return VDouble.of(d, alarm, time, display);
+ } else if (value instanceof String) {
+ return VString.of((String) value, alarm, time);
+ } else {
+ assert value != null;
+ LOGGER.log(Level.WARNING, "Unsupported type: " + value.getClass().getName());
+ throw new Exception("Cannot handle type " + value.getClass().getName());
+ }
+ }
+}
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java
new file mode 100644
index 0000000000..f75228d722
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PV.java
@@ -0,0 +1,257 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.QueryApi;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.logging.Level;
+import org.epics.vtype.Alarm;
+import org.epics.vtype.AlarmSeverity;
+import org.epics.vtype.AlarmStatus;
+import org.epics.vtype.Display;
+import org.epics.vtype.Time;
+import org.epics.vtype.VString;
+import org.epics.vtype.VType;
+import org.phoebus.pv.PV;
+
+/**
+ * Represents a read-only Process Variable (PV) backed by InfluxDB.
+ *
+ * This class uses the InfluxDBPollingManager to efficiently poll InfluxDB
+ * by batching requests with similar periods and the same bucket.
+ *
+ * Write operations are not supported.
+ */
+public class InfluxDB_PV extends PV {
+ /** The name of the InfluxDB bucket */
+ private final String bucket;
+
+ /** The name of the measurement to read from */
+ private final String measurement;
+
+ /** The field to read from (can be null for default) */
+ private final String field;
+
+ /** Period of sampling in milliseconds */
+ private final long period;
+
+ /** The InfluxDB client used for querying */
+ private final InfluxDBClient influxClient;
+
+ /** Timestamp of PV last update. */
+ private Instant lastUpdate;
+
+ /** Object */
+ private Object lastValue;
+
+ /**
+ * Constructs an {@code InfluxDB_PV} with the specified name and base_name.
+ * The base_name is expected to be in one of the following forms:
+ * - {@code influx2://bucket/measurement} - specific bucket, no specific field
+ * - {@code influx2://bucket/measurement/field} - specific bucket, specific field
+ *
+ * The bucket must always be specified explicitly.
+ *
+ * @param name the name of the PV
+ * @param base_name the base URI specifying the InfluxDB bucket, measurement, optional field
+ * @throws IllegalArgumentException if the format is invalid or bucket is missing
+ */
+ public InfluxDB_PV(String name, String base_name) {
+ super(name);
+
+ String new_base_name = base_name.replace("influx2://", "");
+ String[] parts = new_base_name.split("/");
+
+ influxClient = InfluxDB_Context.getInstance().getClient();
+ lastUpdate = Instant.now();
+
+ switch (parts.length) {
+ case 2:
+ this.bucket = parts[0];
+ this.measurement = parts[1];
+ this.field = null;
+ break;
+ case 3:
+ this.bucket = parts[0];
+ this.measurement = parts[1];
+ this.field = Objects.equals(parts[2], "null") ? null : parts[2];
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Invalid InfluxDB PV format: " + base_name +
+ ". Expected format: influx2://bucket/measurement or influx2://bucket/measurement/field"
+ );
+ }
+ if (this.bucket == null || this.bucket.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Bucket must be specified in InfluxDB PV: " + base_name
+ );
+ }
+
+ InfluxDB_Preferences prefs = InfluxDB_Preferences.getInstance();
+ period = prefs.getRefreshPeriod();
+
+ logger.log(Level.INFO, "Creating InfluxDB PV with bucket: {0}, measurement: {1}, field: {2}, period: {3}ms",
+ new Object[]{bucket, measurement, field != null ? field : "all fields", period});
+
+ InfluxDB_PollingManager.getInstance().registerPV(this);
+ }
+
+ /**
+ * Get the bucket for this PV
+ * @return the bucket name
+ */
+ public String getBucket() {
+ return bucket;
+ }
+
+ /**
+ * Get the measurement for this PV
+ * @return the measurement name
+ */
+ public String getMeasurement() {
+ return measurement;
+ }
+
+ /**
+ * Get the field for this PV
+ * @return the field name or null if no specific field
+ */
+ public String getField() {
+ return field;
+ }
+
+ /**
+ * Get the polling period for this PV
+ * @return the period in milliseconds
+ */
+ public long getPeriod() {
+ return period;
+ }
+
+ protected void updateValue(VType value, Object plainValue) {
+ if (value != null && plainValue != null) {
+ lastUpdate = Instant.now();
+ lastValue = plainValue;
+ }
+
+ notifyListenersOfValue(value);
+ }
+
+ /**
+ * Update the PV to a disconnected state with an alarm
+ */
+ protected void updateToDisconnectedState() {
+ VType disconnected;
+
+ if (lastValue == null) {
+ disconnected = VString.of(
+ "Disconnected",
+ Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Disconnected"),
+ Time.of(lastUpdate != null ? lastUpdate : Instant.now())
+ );
+ } else {
+ VType converted = VType.toVType(lastValue, Alarm.noValue(), Time.of(lastUpdate), Display.none());
+
+ disconnected = Objects.requireNonNullElseGet(converted, () -> VString.of(
+ "Disconnected: " + lastValue.toString(),
+ Alarm.of(AlarmSeverity.INVALID, AlarmStatus.CLIENT, "Disconnected"),
+ Time.of(lastUpdate)
+ ));
+ }
+
+ notifyListenersOfValue(disconnected);
+ }
+
+ /**
+ * Asynchronously queries InfluxDB for the most recent value of the configured measurement.
+ *
+ * This method can be used for on-demand reading, but normal updates come through
+ * the PollingManager.
+ *
+ * @return a future that completes with the latest {@link VType} value
+ */
+ @Override
+ public CompletableFuture asyncRead() {
+ CompletableFuture future = new CompletableFuture<>();
+
+ String flux;
+ if (field != null) {
+ flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " +
+ "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"%s\") |> last()",
+ bucket, measurement, field);
+ } else {
+ flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " +
+ "|> filter(fn: (r) => r._measurement == \"%s\") |> last()",
+ bucket, measurement);
+ }
+
+ QueryApi queryApi = influxClient.getQueryApi();
+
+ try {
+ List tables = queryApi.query(flux);
+
+ if (tables.isEmpty()) {
+ future.completeExceptionally(new Exception("No data returned from InfluxDB"));
+ } else {
+ FluxTable table = tables.get(0);
+ if (table.getRecords().isEmpty()) {
+ future.completeExceptionally(new Exception("No records in the result from InfluxDB"));
+ } else {
+ FluxRecord record = table.getRecords().get(0);
+ VType value = InfluxDB_Helper.convertRecordToVType(record);
+ notifyListenersOfValue(value);
+ future.complete(value);
+ }
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Query error for PV: " + getName(), e);
+ future.completeExceptionally(e);
+ }
+
+ return future;
+ }
+
+ /**
+ * Write operation is not supported for InfluxDB PVs.
+ *
+ * @param new_value the value to write (ignored)
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public void write(Object new_value) {
+ throw new UnsupportedOperationException("Write not supported for InfluxDB PV");
+ }
+
+ /**
+ * Asynchronous write is not supported for InfluxDB PVs.
+ *
+ * @param new_value the value to write (ignored)
+ * @return nothing, always throws exception
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public CompletableFuture> asyncWrite(Object new_value) {
+ throw new UnsupportedOperationException("Async write not supported for InfluxDB PV");
+ }
+
+ /**
+ * Unregisters this PV from the polling manager.
+ * Called when the PV is being closed.
+ */
+ @Override
+ protected void close() {
+ InfluxDB_PollingManager.getInstance().unregisterPV(this);
+ }
+}
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java
new file mode 100644
index 0000000000..606490a98d
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PVFactory.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import org.phoebus.pv.PV;
+import org.phoebus.pv.PVFactory;
+
+/**
+ * Factory class for creating {@link InfluxDB_PV} instances.
+ *
+ * This factory integrates with the Phoebus PV framework and registers the
+ * "influx" PV type so it can be discovered and instantiated dynamically
+ * when a PV URI starts with {@code influx2://}.
+ *
+ */
+public class InfluxDB_PVFactory implements PVFactory {
+ /** The PV type identifier used to register this factory. */
+ public static final String TYPE = "influx2";
+
+ /**
+ * Returns the PV type string handled by this factory.
+ * This value is used to match PVs that should be created by this factory.
+ *
+ * @return the PV type string ("influx")
+ */
+ @Override
+ public String getType() { return TYPE; }
+
+ /**
+ * Creates a new {@link InfluxDB_PV} instance with the given name and base_name.
+ *
+ * @param name the full PV name
+ * @param base_name the base URI used to extract the InfluxDB bucket and measurement
+ * @return a new {@code InfluxDB_PV} instance
+ */
+ @Override
+ public PV createPV(String name, String base_name) { return new InfluxDB_PV(name, base_name); }
+}
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java
new file mode 100644
index 0000000000..07d347faab
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_PollingManager.java
@@ -0,0 +1,377 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.Objects;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.util.stream.Collectors;
+
+import org.epics.vtype.VType;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.QueryApi;
+
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+
+/**
+ * A manager class that optimizes InfluxDB polling by batching PV requests.
+ *
+ * This class groups PVs with similar polling periods and same buckets to reduce
+ * the total number of InfluxDB queries required. It also handles dynamic bucket
+ * changes for PVs that track the "last" bucket.
+ */
+public class InfluxDB_PollingManager {
+ /** Logger for this class */
+ private static final Logger LOGGER = Logger.getLogger(InfluxDB_PollingManager.class.getName());
+
+ /** The InfluxDB client used for querying */
+ private final InfluxDBClient influxClient;
+
+ /** Shared executor for all polling tasks */
+ private final ScheduledExecutorService scheduler;
+
+ /** Maps bucket+period to its polling group */
+ private final Map pollingGroups = new ConcurrentHashMap<>();
+
+ /** Maps a registered PV to its polling group key */
+ private final Map pvToGroupKey = new ConcurrentHashMap<>();
+
+ /** Maximum number of fields to include in a single query */
+ private static final int MAX_FIELDS_PER_QUERY = 100;
+
+ /**
+ * Class representing a group of PVs that are polled together
+ * with the same bucket and polling period
+ */
+ private class PollingGroup {
+ final String bucket;
+ final long periodMs;
+ final Set pvSet = Collections.synchronizedSet(new HashSet<>());
+ ScheduledFuture> scheduledTask;
+
+ PollingGroup(String bucket, long periodMs) {
+ this.bucket = bucket;
+ this.periodMs = periodMs;
+ }
+
+ void schedule() {
+ scheduledTask = scheduler.scheduleAtFixedRate(
+ this::pollGroup,
+ 0,
+ periodMs,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ void pollGroup() {
+ if (pvSet.isEmpty()) {
+ return;
+ }
+
+ Map> measurementGroups = new HashMap<>();
+
+ synchronized (pvSet) {
+ for (InfluxDB_PV pv : pvSet) {
+ if (Objects.equals(pv.getBucket(), this.bucket)) {
+ measurementGroups
+ .computeIfAbsent(pv.getMeasurement(), k -> new ArrayList<>())
+ .add(pv);
+ }
+ }
+ }
+
+ for (Map.Entry> entry : measurementGroups.entrySet()) {
+ String measurement = entry.getKey();
+ List pvs = entry.getValue();
+
+ if (!pvs.isEmpty()) {
+ executeOptimizedQuery(bucket, measurement, pvs);
+ }
+ }
+ }
+
+ void executeOptimizedQuery(String bucket, String measurement, List pvs) {
+ try {
+ Set uniqueFields = pvs.stream()
+ .map(InfluxDB_PV::getField)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ if (uniqueFields.isEmpty()) {
+ String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " +
+ "|> filter(fn: (r) => r._measurement == \"%s\") |> last()",
+ bucket, measurement);
+
+ executeQueryAndDistributeResults(flux, measurement, pvs);
+ return;
+ }
+
+ List fieldsList = new ArrayList<>(uniqueFields);
+ Map processedPVs = new HashMap<>();
+
+ for (int i = 0; i < fieldsList.size(); i += MAX_FIELDS_PER_QUERY) {
+ int end = Math.min(i + MAX_FIELDS_PER_QUERY, fieldsList.size());
+ List batchFields = fieldsList.subList(i, end);
+
+ String fieldFilter = batchFields.stream()
+ .map(field -> "r._field == \"" + field + "\"")
+ .collect(Collectors.joining(" or "));
+
+ String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " +
+ "|> filter(fn: (r) => r._measurement == \"%s\" and (%s)) |> last()",
+ bucket, measurement, fieldFilter);
+
+ List batchPVs = pvs.stream()
+ .filter(pv -> {
+ String field = pv.getField();
+ return field == null || batchFields.contains(field);
+ })
+ .filter(pv -> !processedPVs.getOrDefault(pv, false))
+ .collect(Collectors.toList());
+
+ if (!batchPVs.isEmpty()) {
+ executeQueryAndDistributeResults(flux, measurement, batchPVs);
+
+ batchPVs.forEach(pv -> processedPVs.put(pv, true));
+ }
+ }
+
+ List unprocessedPVs = pvs.stream()
+ .filter(pv -> !processedPVs.getOrDefault(pv, false))
+ .collect(Collectors.toList());
+
+ if (!unprocessedPVs.isEmpty()) {
+ String flux = String.format("from(bucket: \"%s\") |> range(start: -10s) " +
+ "|> filter(fn: (r) => r._measurement == \"%s\") |> last()",
+ bucket, measurement);
+
+ executeQueryAndDistributeResults(flux, measurement, unprocessedPVs);
+ }
+
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Error executing batch query", e);
+ }
+ }
+
+ private void executeQueryAndDistributeResults(String flux, String measurement, List pvs) {
+ try {
+ QueryApi queryApi = influxClient.getQueryApi();
+ List tables = queryApi.query(flux);
+
+ if (!tables.isEmpty()) {
+ Map recordsByField = new HashMap<>();
+ for (FluxTable table : tables) {
+ for (FluxRecord record : table.getRecords()) {
+ String field = record.getField();
+ recordsByField.put(field, record);
+ }
+ }
+
+ for (InfluxDB_PV pv : pvs) {
+ String field = pv.getField();
+ long disconnectTimeoutMs = InfluxDB_Preferences.getInstance().getDisconnectTimeout();
+
+ if (field != null) {
+ FluxRecord record = recordsByField.get(field);
+
+ if (record != null) {
+ Duration timeSinceUpdate = Duration.between(Objects.requireNonNull(record.getTime()), Instant.now());
+
+ if (timeSinceUpdate.toMillis() > disconnectTimeoutMs) {
+ pv.updateToDisconnectedState();
+ } else {
+ VType value = InfluxDB_Helper.convertRecordToVType(record);
+ if (value != null) {
+ pv.updateValue(value, record.getValue());
+ }
+ }
+ } else {
+ pv.updateToDisconnectedState();
+ }
+ } else {
+ List matchingRecords = tables.stream()
+ .flatMap(table -> table.getRecords().stream())
+ .filter(record -> measurement.equals(record.getMeasurement())).toList();
+
+ if (!matchingRecords.isEmpty()) {
+ FluxRecord record = matchingRecords.get(0);
+ Duration timeSinceUpdate = Duration.between(Objects.requireNonNull(record.getTime()), Instant.now());
+
+ if (timeSinceUpdate.toMillis() > disconnectTimeoutMs) {
+ pv.updateToDisconnectedState();
+ } else {
+ VType value = InfluxDB_Helper.convertRecordToVType(record);
+ if (value != null) {
+ pv.updateValue(value, record.getValue());
+ }
+ }
+ } else {
+ pv.updateToDisconnectedState();
+ }
+ }
+ }
+ } else {
+ for (InfluxDB_PV pv : pvs) {
+ pv.updateToDisconnectedState();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error executing batch query", e);
+ for (InfluxDB_PV pv : pvs) {
+ pv.updateToDisconnectedState();
+ }
+ }
+ }
+
+ void cancel() {
+ if (scheduledTask != null) {
+ scheduledTask.cancel(false);
+ scheduledTask = null;
+ }
+ }
+ }
+
+ /**
+ * Private constructor to enforce singleton pattern
+ */
+ private InfluxDB_PollingManager() {
+ influxClient = InfluxDB_Context.getInstance().getClient();
+ scheduler = Executors.newScheduledThreadPool(
+ Runtime.getRuntime().availableProcessors(),
+ r -> {
+ Thread thread = new Thread(r, "InfluxDB-PV-BatchPoller");
+ thread.setDaemon(true);
+ return thread;
+ }
+ );
+ }
+
+ private static class Holder {
+ private static final InfluxDB_PollingManager INSTANCE = new InfluxDB_PollingManager();
+ }
+
+ /**
+ * Gets the singleton instance of the polling manager
+ *
+ * @return the polling manager instance
+ */
+ public static InfluxDB_PollingManager getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ /**
+ * Registers a PV to be polled by the manager
+ *
+ * @param pv the InfluxDB PV to register
+ */
+ public void registerPV(InfluxDB_PV pv) {
+ registerPVInternal(pv);
+ }
+
+ /**
+ * Internal method to register a PV (used for both initial registration and reorganization)
+ *
+ * @param pv the InfluxDB PV to register
+ */
+ private void registerPVInternal(InfluxDB_PV pv) {
+ String bucket = pv.getBucket();
+ if (bucket == null) {
+ LOGGER.log(Level.WARNING, "Cannot register PV {0} - no bucket available", pv.getName());
+ return;
+ }
+
+ String groupKey = createGroupKey(bucket, pv.getPeriod());
+
+ pollingGroups.computeIfAbsent(groupKey, k -> {
+ String[] parts = k.split(":");
+ String bucketName = parts[0];
+ long period = Long.parseLong(parts[1]);
+
+ PollingGroup group = new PollingGroup(bucketName, period);
+ group.schedule();
+ return group;
+ }).pvSet.add(pv);
+
+ pvToGroupKey.put(pv, groupKey);
+ }
+
+ /**
+ * Unregisters a PV from the manager
+ *
+ * @param pv the InfluxDB PV to unregister
+ */
+ public void unregisterPV(InfluxDB_PV pv) {
+ String groupKey = pvToGroupKey.remove(pv);
+ if (groupKey != null) {
+ PollingGroup group = pollingGroups.get(groupKey);
+ if (group != null) {
+ group.pvSet.remove(pv);
+ if (group.pvSet.isEmpty()) {
+ group.cancel();
+ pollingGroups.remove(groupKey);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a group key for the given bucket and period
+ * This will group similar periods together to reduce the number of scheduled tasks
+ *
+ * @param bucket the InfluxDB bucket
+ * @param periodMs the polling period in milliseconds
+ * @return a key string for grouping
+ */
+ private String createGroupKey(String bucket, long periodMs) {
+ long bucketedPeriod;
+
+ if (periodMs <= 100) {
+ bucketedPeriod = periodMs;
+ } else if (periodMs <= 1000) {
+ bucketedPeriod = Math.round(periodMs / 100.0) * 100;
+ } else if (periodMs <= 10000) {
+ bucketedPeriod = Math.round(periodMs / 1000.0) * 1000;
+ } else {
+ bucketedPeriod = Math.round(periodMs / 5000.0) * 5000;
+ }
+
+ return bucket + ":" + bucketedPeriod;
+ }
+
+ /**
+ * Shuts down the polling manager and cancels all polling tasks
+ */
+ public void shutdown() {
+ for (PollingGroup group : pollingGroups.values()) {
+ group.cancel();
+ }
+
+ pollingGroups.clear();
+ pvToGroupKey.clear();
+ scheduler.shutdownNow();
+ }
+}
diff --git a/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java
new file mode 100644
index 0000000000..3c80f23d2c
--- /dev/null
+++ b/core/pv-influx2/src/main/java/org/phoebus/pv/influx2/InfluxDB_Preferences.java
@@ -0,0 +1,158 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import java.lang.reflect.Field;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.phoebus.framework.preferences.PreferencesReader;
+
+/**
+ * Singleton class responsible for loading and providing access to InfluxDB connection preferences.
+ *
+ * Preferences are read from a properties file named {@code pv_influx_preferences.properties}
+ * using the {@link PreferencesReader} utility. This class centralizes configuration details
+ * such as host, port, organization, authentication token, and HTTPS usage flag.
+ *
+ */
+public class InfluxDB_Preferences {
+ /** Logger used for logging information and errors related to InfluxDB preferences. */
+ private static final Logger LOGGER = Logger.getLogger(InfluxDB_Preferences.class.getName());
+
+ private static final String HOST = "influx_host";
+ private static final String PORT = "influx_port";
+ private static final String ORGANIZATION = "influx_organization";
+ private static final String TOKEN = "influx_token";
+ private static final String USE_HTTPS = "influx_useHttps";
+ private static final String DISCONNECT_TIMEOUT = "influx_disconnectTimeoutMs";
+
+ /** The singleton instance of this class. */
+ private static final InfluxDB_Preferences INSTANCE = new InfluxDB_Preferences();
+
+ private static String host;
+ private static int port;
+ private String organization;
+ private String token;
+ private boolean useHttps;
+ private long refreshPeriod;
+ private long disconnectTimeout;
+
+ /**
+ * Private constructor that loads preferences upon instantiation.
+ * This is part of the singleton pattern.
+ */
+ private InfluxDB_Preferences() {
+ try {
+ installPreferences();
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error while loading InfluxDB preferences", e);
+ }
+ }
+
+ /**
+ * Reads the InfluxDB preferences from the properties file.
+ * Uses the {@link PreferencesReader} to retrieve values.
+ */
+ private void installPreferences() {
+ PreferencesReader prefs = new PreferencesReader(InfluxDB_Preferences.class, "/pv_influx_preferences.properties");
+
+ try {
+ Class> prefsClass = Class.forName("org.csstudio.display.builder.runtime.Preferences");
+ Field field = prefsClass.getField("update_throttle_ms");
+ refreshPeriod = (Integer) field.get(null);
+ } catch (Exception e) {
+ refreshPeriod = 250;
+ }
+
+ host = prefs.get(HOST);
+ port = prefs.getInt(PORT);
+ organization = prefs.get(ORGANIZATION);
+ token = prefs.get(TOKEN);
+ useHttps = prefs.getBoolean(USE_HTTPS);
+ disconnectTimeout = prefs.getLong(DISCONNECT_TIMEOUT);
+
+ if (disconnectTimeout == 0) {
+ disconnectTimeout = 3000;
+ }
+ }
+
+ /**
+ * Returns the singleton instance of {@code InfluxDB_Preferences}.
+ *
+ * @return the singleton instance
+ */
+ public static InfluxDB_Preferences getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns the configured InfluxDB host name or IP address.
+ *
+ * @return the InfluxDB host
+ */
+ public static String getHost() {
+ return host;
+ }
+
+ /**
+ * Returns the configured port number for connecting to InfluxDB.
+ *
+ * @return the InfluxDB port
+ */
+ public static int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns the name of the InfluxDB organization.
+ *
+ * @return the InfluxDB organization
+ */
+ public String getOrganization() {
+ return organization;
+ }
+
+ /**
+ * Returns the authentication token used for accessing InfluxDB.
+ *
+ * @return the InfluxDB token
+ */
+ public String getToken() {
+ return token;
+ }
+
+ /**
+ * Indicates whether HTTPS should be used when connecting to InfluxDB.
+ *
+ * @return {@code true} if HTTPS is enabled; {@code false} otherwise
+ */
+ public boolean isUseHttps() {
+ return useHttps;
+ }
+
+ /**
+ * Retuns the refresh period of a PV in milliseconds.
+ * Default: 1000ms
+ *
+ * @return the refrehs period
+ */
+ public long getRefreshPeriod() {
+ return refreshPeriod;
+ }
+
+ /**
+ * Returns the timeout before a PV is considered as disconnected in milliseconds.
+ * Disconnected state is triggered when a PV isn't updated for a defined amount of time.
+ * Default: 3000 ms
+ *
+ * @return timeout disconnected
+ */
+ public long getDisconnectTimeout() {
+ return disconnectTimeout;
+ }
+}
diff --git a/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory b/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory
new file mode 100644
index 0000000000..75980746b3
--- /dev/null
+++ b/core/pv-influx2/src/main/resources/META-INF/services/org.phoebus.pv.PVFactory
@@ -0,0 +1 @@
+org.phoebus.pv.influx2.InfluxDB_PVFactory
\ No newline at end of file
diff --git a/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java b/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java
new file mode 100644
index 0000000000..31d4fcda19
--- /dev/null
+++ b/core/pv-influx2/src/test/java/org/phoebus/pv/influx2/InfluxPVTest.java
@@ -0,0 +1,614 @@
+/*******************************************************************************
+ * Copyright (C) 2025 Thales.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *******************************************************************************/
+package org.phoebus.pv.influx2;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.QueryApi;
+import com.influxdb.client.WriteApiBlocking;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+import org.epics.vtype.VDouble;
+import org.epics.vtype.VString;
+import org.epics.vtype.VType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.phoebus.pv.PV;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.testng.annotations.AfterClass;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mockStatic;
+
+/**
+ * Comprehensive test suite for InfluxDB Process Variable (PV) functionality.
+ * This test class validates the complete InfluxDB PV system including:
+ * - Basic PV creation and configuration with explicit bucket specification
+ * - Asynchronous read operations
+ * - Connection and error handling
+ * - Data type conversion between InfluxDB and EPICS VTypes
+ * - Performance testing with large numbers of PVs
+ * - Factory pattern implementation
+ * - Polling manager stress testing
+ * The tests use Mockito to simulate InfluxDB client interactions without
+ * requiring an actual InfluxDB instance, ensuring fast and reliable unit testing.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class InfluxPVTest {
+
+ @Mock
+ private InfluxDBClient mockInfluxClient;
+
+ @Mock
+ private QueryApi mockQueryApi;
+
+ @Mock
+ private WriteApiBlocking mockWriteApi;
+
+ private MockedStatic mockContext;
+ private MockedStatic mockPreferences;
+
+ private InfluxDB_Context mockContextInstance;
+ private InfluxDB_Preferences mockPreferencesInstance;
+ private InfluxDB_PollingManager pollingManager;
+
+ private static final String TEST_BUCKET = "test_bucket";
+ private static final String TEST_MEASUREMENT = "test_measurement";
+ private static final String TEST_FIELD = "value";
+ private static final long TEST_PERIOD = 100L;
+
+ /**
+ * Sets up the test environment before each test method.
+ * Initializes all mock objects including InfluxDB client, APIs, context,
+ * and preferences. Configures default behaviors for common operations.
+ */
+ @BeforeEach
+ void setUp() {
+ mockContextInstance = mock(InfluxDB_Context.class);
+ when(mockContextInstance.getClient()).thenReturn(mockInfluxClient);
+ mockContext = mockStatic(InfluxDB_Context.class);
+ mockContext.when(InfluxDB_Context::getInstance).thenReturn(mockContextInstance);
+
+ mockPreferencesInstance = mock(InfluxDB_Preferences.class);
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(TEST_PERIOD);
+ when(mockPreferencesInstance.getDisconnectTimeout()).thenReturn(3000L);
+ when(mockPreferencesInstance.isUseHttps()).thenReturn(false);
+ mockPreferences = mockStatic(InfluxDB_Preferences.class);
+ mockPreferences.when(InfluxDB_Preferences::getInstance).thenReturn(mockPreferencesInstance);
+
+ when(mockInfluxClient.getQueryApi()).thenReturn(mockQueryApi);
+ when(mockInfluxClient.getWriteApiBlocking()).thenReturn(mockWriteApi);
+
+ pollingManager = InfluxDB_PollingManager.getInstance();
+ }
+
+ /**
+ * Cleans up mock static objects after each test method to prevent interference
+ * between tests.
+ */
+ @AfterEach
+ void tearDownTest() {
+ if (mockContext != null) {
+ mockContext.close();
+ }
+ if (mockPreferences != null) {
+ mockPreferences.close();
+ }
+ }
+
+ /**
+ * Performs final cleanup of the polling manager after all tests complete.
+ * Ensures proper shutdown of background threads and resources.
+ */
+ @AfterClass
+ void tearDown() {
+ if (pollingManager != null) {
+ pollingManager.shutdown();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Tests basic InfluxDB PV creation with explicit bucket specification.
+ * Verifies that a PV can be created with a specific bucket, measurement,
+ * and field, and that all properties are correctly initialized.
+ */
+ @Test
+ @Order(1)
+ @DisplayName("Test basic InfluxDB PV creation with bucket/measurement/field")
+ void testBasicPVCreationWithField() {
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD
+ );
+
+ assertNotNull(pv);
+ assertEquals(TEST_BUCKET, pv.getBucket());
+ assertEquals(TEST_MEASUREMENT, pv.getMeasurement());
+ assertEquals(TEST_FIELD, pv.getField());
+ assertEquals(100L, pv.getPeriod());
+
+ pv.close();
+ }
+
+ /**
+ * Tests PV creation without specific field specification.
+ * Verifies that a PV can be created with only bucket and measurement,
+ * allowing the system to read all fields from the measurement.
+ */
+ @Test
+ @Order(2)
+ @DisplayName("Test PV creation with bucket/measurement only")
+ void testPVCreationWithoutField() {
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT
+ );
+
+ assertNotNull(pv);
+ assertEquals(TEST_BUCKET, pv.getBucket());
+ assertEquals(TEST_MEASUREMENT, pv.getMeasurement());
+ assertNull(pv.getField());
+ assertEquals(100L, pv.getPeriod());
+
+ pv.close();
+ }
+
+ /**
+ * Tests that PV creation fails when bucket is not specified.
+ * The system should throw an IllegalArgumentException when attempting
+ * to create a PV without a bucket specification.
+ */
+ @Test
+ @Order(3)
+ @DisplayName("Test PV creation fails without bucket")
+ void testPVCreationFailsWithoutBucket() {
+ assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV(TEST_MEASUREMENT, "influx2://" + TEST_MEASUREMENT));
+ }
+
+ /**
+ * Tests that PV creation fails with invalid format.
+ * The system should throw an IllegalArgumentException when the format
+ * doesn't match the expected patterns.
+ */
+ @Test
+ @Order(4)
+ @DisplayName("Test PV creation fails with invalid format")
+ void testPVCreationFailsWithInvalidFormat() {
+ assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2://bucket/measurement/field/extra"));
+ assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2://measurement"));
+ }
+
+ /**
+ * Tests asynchronous read operations from InfluxDB.
+ * Verifies that the PV can successfully execute asynchronous queries
+ * against InfluxDB and convert the results to appropriate VType objects.
+ */
+ @Test
+ @Order(5)
+ @DisplayName("Test asynchronous read")
+ void testAsyncRead() throws Exception {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L);
+
+ FluxRecord mockRecord = mock(FluxRecord.class);
+ when(mockRecord.getValue()).thenReturn(42.0);
+ when(mockRecord.getTime()).thenReturn(Instant.now());
+ when(mockRecord.getField()).thenReturn(TEST_FIELD);
+
+ FluxTable mockTable = mock(FluxTable.class);
+ when(mockTable.getRecords()).thenReturn(List.of(mockRecord));
+
+ when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable));
+
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD
+ );
+
+ CompletableFuture future = pv.asyncRead();
+ VType result = future.get(5, TimeUnit.SECONDS);
+
+ assertNotNull(result);
+ assertInstanceOf(VDouble.class, result);
+ assertEquals(42.0, ((VDouble) result).getValue(), 0.001);
+
+ pv.close();
+ }
+
+ /**
+ * Tests proper handling of connection errors during read operations.
+ * When the InfluxDB connection fails, the PV should propagate the error
+ * appropriately through the CompletableFuture mechanism.
+ */
+ @Test
+ @Order(6)
+ @DisplayName("Test connection error handling")
+ void testConnectionErrorHandling() {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L);
+
+ when(mockQueryApi.query(anyString())).thenThrow(new RuntimeException("Connection failed"));
+
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD
+ );
+
+ CompletableFuture future = pv.asyncRead();
+ assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
+
+ pv.close();
+ }
+
+ /**
+ * Tests PV disconnection scenarios.
+ * Verifies that PVs can handle disconnection events gracefully,
+ * typically triggered by stale data or connection timeouts.
+ */
+ @Test
+ @Order(7)
+ @DisplayName("Test disconnection handling")
+ void testDisconnectionHandling() throws InterruptedException {
+ FluxRecord oldRecord = mock(FluxRecord.class);
+ when(oldRecord.getValue()).thenReturn(42.0);
+ when(oldRecord.getTime()).thenReturn(Instant.now().minus(10, ChronoUnit.SECONDS));
+ when(oldRecord.getField()).thenReturn(TEST_FIELD);
+
+ FluxTable mockTable = mock(FluxTable.class);
+ when(mockTable.getRecords()).thenReturn(List.of(oldRecord));
+
+ when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable));
+
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD
+ );
+
+ Thread.sleep(500);
+
+ pv.close();
+ }
+
+ /**
+ * Tests data type conversion from InfluxDB records to EPICS VTypes.
+ * Verifies that the helper utilities can correctly convert different
+ * data types from InfluxDB FluxRecords to their corresponding VType representations.
+ */
+ @Test
+ @Order(8)
+ @DisplayName("Test data type conversion")
+ void testDataTypeConversion() throws Exception {
+ FluxRecord doubleRecord = mock(FluxRecord.class);
+ when(doubleRecord.getValue()).thenReturn(3.14159);
+ when(doubleRecord.getTime()).thenReturn(Instant.now());
+
+ VType doubleResult = InfluxDB_Helper.convertRecordToVType(doubleRecord);
+ assertInstanceOf(VDouble.class, doubleResult);
+ assertEquals(3.14159, ((VDouble) doubleResult).getValue(), 0.00001);
+
+ FluxRecord stringRecord = mock(FluxRecord.class);
+ when(stringRecord.getValue()).thenReturn("test-string");
+ when(stringRecord.getTime()).thenReturn(Instant.now());
+
+ VType stringResult = InfluxDB_Helper.convertRecordToVType(stringRecord);
+ assertInstanceOf(VString.class, stringResult);
+ assertEquals("test-string", ((VString) stringResult).getValue());
+
+ FluxRecord intRecord = mock(FluxRecord.class);
+ when(intRecord.getValue()).thenReturn(42);
+ when(intRecord.getTime()).thenReturn(Instant.now());
+
+ VType intResult = InfluxDB_Helper.convertRecordToVType(intRecord);
+ assertInstanceOf(VDouble.class, intResult);
+ assertEquals(42.0, ((VDouble) intResult).getValue(), 0.001);
+ }
+
+ /**
+ * Tests PV creation without specifying a specific field name.
+ * When no field is specified, the PV should be able to read from
+ * the measurement using default field resolution.
+ */
+ @Test
+ @Order(9)
+ @DisplayName("Test PV without specific field")
+ void testPVWithoutSpecificField() throws Exception {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L);
+
+ FluxRecord mockRecord = mock(FluxRecord.class);
+ when(mockRecord.getValue()).thenReturn(99.9);
+ when(mockRecord.getTime()).thenReturn(Instant.now());
+ when(mockRecord.getMeasurement()).thenReturn(TEST_MEASUREMENT);
+
+ FluxTable mockTable = mock(FluxTable.class);
+ when(mockTable.getRecords()).thenReturn(List.of(mockRecord));
+
+ when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable));
+
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT
+ );
+
+ CompletableFuture future = pv.asyncRead();
+ VType result = future.get(5, TimeUnit.SECONDS);
+
+ assertNotNull(result);
+ assertNull(pv.getField());
+ assertInstanceOf(VDouble.class, result);
+
+ pv.close();
+ }
+
+ /**
+ * Performance test with 10,000 PVs to validate system scalability.
+ * This test creates a large number of PVs to validate that the system
+ * can handle high-volume scenarios.
+ */
+ @Test
+ @Order(10)
+ @DisplayName("Performance test with 10000 PVs")
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testPerformanceWith10000PVs() throws Exception {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(5000L);
+
+ final int PV_COUNT = 10000;
+ final int BUCKET_COUNT = 10;
+
+ FluxRecord mockRecord = mock(FluxRecord.class);
+ when(mockRecord.getValue()).thenReturn(42.0);
+ when(mockRecord.getTime()).thenReturn(Instant.now());
+ when(mockRecord.getField()).thenReturn("");
+
+ FluxTable mockTable = mock(FluxTable.class);
+ when(mockTable.getRecords()).thenReturn(List.of(mockRecord));
+ when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable));
+
+ AtomicLong creationTime = new AtomicLong(0);
+ AtomicInteger successfulCreations = new AtomicInteger(0);
+
+ List pvs = new ArrayList<>(PV_COUNT);
+
+ long startTime = System.nanoTime();
+
+ for (int i = 0; i < PV_COUNT; i++) {
+ long pvStartTime = System.nanoTime();
+
+ InfluxDB_PV pv = mock(InfluxDB_PV.class);
+ when(pv.getBucket()).thenReturn("bucket-" + (i % BUCKET_COUNT));
+ when(pv.getMeasurement()).thenReturn("measurement-" + i);
+
+ when(pv.asyncRead()).thenReturn(CompletableFuture.completedFuture(new VType() {
+ @Override
+ public String toString() {
+ return "mocked_value";
+ }
+ }));
+
+ pvs.add(pv);
+
+ long pvEndTime = System.nanoTime();
+ creationTime.addAndGet(pvEndTime - pvStartTime);
+ successfulCreations.incrementAndGet();
+ }
+
+ long endTime = System.nanoTime();
+ long totalCreationTime = endTime - startTime;
+
+ Thread.sleep(1000);
+
+ AtomicInteger successfulReads = new AtomicInteger(0);
+ AtomicLong readTime = new AtomicLong(0);
+
+ for (InfluxDB_PV pv : pvs) {
+ try {
+ long readStart = System.nanoTime();
+ CompletableFuture future = pv.asyncRead();
+ VType result = future.get(5, TimeUnit.SECONDS);
+ long readEnd = System.nanoTime();
+
+ if (result != null) {
+ successfulReads.incrementAndGet();
+ readTime.addAndGet(readEnd - readStart);
+ }
+ } catch (Exception e) {
+ System.err.println("Read failed: " + e.getMessage());
+ }
+ }
+
+ System.out.println("\n=== Performance Test Results ===");
+ System.out.println("Total PVs created: " + successfulCreations.get() + "/" + PV_COUNT);
+ System.out.println("Total creation time: " + (totalCreationTime / 1_000_000) + " ms");
+ System.out.println("Average creation time per PV: " + (creationTime.get() / PV_COUNT / 1_000_000) + " ms");
+ System.out.println("Successful reads: " + successfulReads.get() + "/" + PV_COUNT);
+ if (successfulReads.get() > 0) {
+ System.out.println("Average read time: " + (readTime.get() / successfulReads.get() / 1_000_000) + " ms");
+ }
+
+ assertTrue(successfulCreations.get() >= PV_COUNT * 0.95,
+ "Should create at least 95% of PVs successfully");
+ assertTrue(totalCreationTime < 30_000_000_000L,
+ "Total creation time should be less than 30 seconds");
+ assertTrue(successfulReads.get() >= 80,
+ "Should complete at least 80% of read tests successfully");
+
+ System.gc();
+ Thread.sleep(100);
+ long finalMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ System.out.println("Final memory usage: " + finalMemory / 1024 / 1024 + " MB");
+
+ long cleanupStart = System.nanoTime();
+ for (InfluxDB_PV pv : pvs) {
+ try {
+ pv.close();
+ } catch (Exception ignore) {
+ }
+ }
+ long cleanupEnd = System.nanoTime();
+ System.out.println("Cleanup time: " + ((cleanupEnd - cleanupStart) / 1_000_000) + " ms");
+ System.out.println("=== End Performance Test ===\n");
+ }
+
+ /**
+ * Tests that unsupported operations throw appropriate exceptions.
+ * InfluxDB PVs are read-only by design.
+ */
+ @Test
+ @Order(11)
+ @DisplayName("Test unsupported operations")
+ void testUnsupportedOperations() {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(10000L);
+
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/" + TEST_FIELD
+ );
+
+ assertThrows(UnsupportedOperationException.class, () -> pv.write(42.0));
+ assertThrows(UnsupportedOperationException.class, () -> pv.asyncWrite(42.0));
+
+ pv.close();
+ }
+
+ /**
+ * Tests the InfluxDB PV factory implementation.
+ * Verifies that the factory correctly identifies itself with the "influx2"
+ * type and can create InfluxDB_PV instances when requested.
+ */
+ @Test
+ @Order(12)
+ @DisplayName("Test InfluxDB PV Factory")
+ void testPVFactory() {
+ InfluxDB_PVFactory factory = new InfluxDB_PVFactory();
+
+ assertEquals("influx2", factory.getType());
+
+ PV pv = factory.createPV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT,
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT
+ );
+ assertNotNull(pv);
+ assertInstanceOf(InfluxDB_PV.class, pv);
+ }
+
+ /**
+ * Stress tests the polling manager with a large number of PVs.
+ * This test creates 100,000 PVs to validate that the polling manager
+ * can handle extreme loads without crashing or causing memory issues.
+ */
+ @Test
+ @Order(13)
+ @DisplayName("Test stress polling manager")
+ void testPollingManagerStress() throws InterruptedException {
+ final int STRESS_PV_COUNT = 100_000;
+ List stressPVs = Collections.synchronizedList(new ArrayList<>());
+
+ try {
+ when(mockPreferencesInstance.getRefreshPeriod()).thenReturn(5000L);
+
+ FluxRecord mockRecord = mock(FluxRecord.class);
+ when(mockRecord.getValue()).thenReturn(123.45);
+ when(mockRecord.getTime()).thenReturn(Instant.now());
+ when(mockRecord.getField()).thenReturn("mockField");
+
+ FluxTable mockTable = mock(FluxTable.class);
+ when(mockTable.getRecords()).thenReturn(List.of(mockRecord));
+ when(mockQueryApi.query(anyString())).thenReturn(List.of(mockTable));
+
+ for (int i = 0; i < STRESS_PV_COUNT; i++) {
+ String pvName = "stress_pv" + i;
+ String bucketName = "bucket_" + (i % 5);
+ String measurementName = "measurement_" + (i % 20);
+ String fieldName = "field_" + (i % 10);
+
+ InfluxDB_PV pv = new InfluxDB_PV(pvName,
+ String.format("influx2://%s/%s/%s", bucketName, measurementName, fieldName));
+ stressPVs.add(pv);
+ }
+
+ Thread.sleep(2000);
+
+ assertTrue(true, "Stress test completed without major issues");
+ } finally {
+ for (InfluxDB_PV pv : stressPVs) {
+ try {
+ pv.close();
+ } catch (Exception e) {
+ System.err.println("Error closing PV: " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests PV creation with null field (should be treated as no specific field).
+ * Verifies that passing "null" as field name results in no specific field selection.
+ */
+ @Test
+ @Order(14)
+ @DisplayName("Test PV creation with 'null' field")
+ void testPVCreationWithNullField() {
+ InfluxDB_PV pv = new InfluxDB_PV(
+ TEST_BUCKET + "/" + TEST_MEASUREMENT + "/null",
+ "influx2://" + TEST_BUCKET + "/" + TEST_MEASUREMENT + "/null"
+ );
+
+ assertNotNull(pv);
+ assertEquals(TEST_BUCKET, pv.getBucket());
+ assertEquals(TEST_MEASUREMENT, pv.getMeasurement());
+ assertNull(pv.getField());
+
+ pv.close();
+ }
+
+ /**
+ * Tests that empty bucket name is rejected.
+ * The system should throw an IllegalArgumentException for empty bucket names.
+ */
+ @Test
+ @Order(15)
+ @DisplayName("Test empty bucket name is rejected")
+ void testEmptyBucketRejected() {
+ assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2:///measurement"));
+
+ assertThrows(IllegalArgumentException.class, () -> new InfluxDB_PV("test", "influx2:// /measurement"));
+ }
+}
diff --git a/phoebus-product/pom.xml b/phoebus-product/pom.xml
index e4747da801..2cc74f6a33 100644
--- a/phoebus-product/pom.xml
+++ b/phoebus-product/pom.xml
@@ -22,6 +22,11 @@
core-pv-mqtt
5.0.3-SNAPSHOT