From 6ed7a1edf0387615ae5f20945f119bc2b3694506 Mon Sep 17 00:00:00 2001 From: enbnt Date: Tue, 21 Mar 2023 14:33:19 -0700 Subject: [PATCH] Initial commit for thread-safe cb impl --- .../util/ConcurrentCircularBuffer.scala | 258 ++++++++++++++++++ .../util/SynchronizedCircularBuffer.scala | 250 +++++++++++++++++ 2 files changed, 508 insertions(+) create mode 100644 timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/ConcurrentCircularBuffer.scala create mode 100644 timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/SynchronizedCircularBuffer.scala diff --git a/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/ConcurrentCircularBuffer.scala b/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/ConcurrentCircularBuffer.scala new file mode 100644 index 0000000..5ebb249 --- /dev/null +++ b/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/ConcurrentCircularBuffer.scala @@ -0,0 +1,258 @@ +package dev.enbnt.timeseries.util + +import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger +import scala.annotation.varargs +import scala.collection.IndexedSeqView +import scala.collection.IterableFactory +import scala.collection.IterableFactoryDefaults +import scala.collection.IterableOps +import scala.collection.StrictOptimizedIterableOps +import scala.collection.mutable +import scala.reflect.ClassTag + +/** + * A mutable bounded buffer implementation, which behaves as a circle/ring for + * sequential read/write access. The underlying buffer uses two pointers — + * a read-index and a write-index, which are used to determine the logical + * beginning, ending, and size of the unconsumed buffer. + * + * @example + * // format: off + * {{{ + * val ring = new ConcurrentCircularBuffer[String](2) // ring: [_, _] read: 0 write: 0 size: 0 + * ring.write("hello") // ring: ["hello", _] read: 0 write: 1 size: 1 + * ring.write("world") // ring: ["hello", "world"] read: 0 write: 0 size: 2 + * + * ring.read() // "hello" - ring: [_, "world"] read: 1 write: 0 size: 1 + * ring.read() // "world" - ring: [_, _] read: 0 write: 0 size: 0 + * + * // calling ring.read() again would throw an exception + * + * ring.write("good") // ring: ["good", _] read: 0 write: 1 size: 1 + * ring.write("bye") // ring: ["good", "bye"] read: 0 write: 0 size: 2 + * ring.write("sunshine") // ring: ["sunshine", "bye"] read: 1 write: 1 size: 2 + * + * ring.read() // "bye" - ring: ["sunshine", _] read: 0 write: 1 size: 1 + * ring.read() // "sunshine" - ring: [_, _] read: 1 write: 1 size: 0 + * }}} + * + * @note + * This implementation IS thread safe. + * + * @note + * The only methods that modify the the buffer's indices are [[read()]] and + * [[write()]]. Any other methods which access data in the buffer via an + * offset will not mutate the underlying buffer state. + * + * This is a **VERY IMPORTANT** detail, as access via iterators or collection + * methods (map, flatMap) will act over a view of the buffer data, but not + * modify its state directly. This allows the data in the buffer to be accessed + * *without* emptying or mutating the state of the buffer as a result. + * + * @see + * [[https://en.wikipedia.org/wiki/Circular_buffer]] + * @see + * [[https://docs.scala-lang.org/overviews/core/custom-collections.html]] + * + * @param capacity + * The maximum allowed size of the underlying buffer. + * @param elems + * The defined buffer elements. + * @param readIdx + * The starting offset of the read index + * @param writeIdx + * The starting offset of the write index + * @tparam T + * The type of elements of contained within the buffer + */ +private[timeseries] class ConcurrentCircularBuffer[T: ClassTag] private ( + val capacity: Int, + elems: Array[T], + val readIdx: AtomicInteger, + val writeIdx: AtomicInteger +) extends Iterable[T] + with IterableOps[T, ConcurrentCircularBuffer, ConcurrentCircularBuffer[T]] + with IterableFactoryDefaults[T, ConcurrentCircularBuffer] + with StrictOptimizedIterableOps[ + T, + ConcurrentCircularBuffer, + ConcurrentCircularBuffer[T] + ] { + self => + + require( + capacity > 0, + s"ConcurrentCircularBuffer capacity must be > 0, but received '$capacity'" + ) + + private[this] val readLatch = new Semaphore(readWriteDelta) + + private[this] def count: Int = + math.min(readWriteDelta, capacity) + + private[this] def readWriteDelta: Int = synchronized { + writeIdx.get - readIdx.get + } + 1 + + def this(capacity: Int, elems: Array[T], readIdx: Int, writeIdx: Int) = this( + capacity, + elems, + new AtomicInteger(readIdx), + new AtomicInteger(writeIdx) + ) + + def this(capacity: Int) = + this(capacity, elems = Array.ofDim[T](capacity), readIdx = 0, writeIdx = -1) + + /** + * Consume the value at the current read index, modifying the state of the + * buffer. The thread will block indefinitely until a value has been written. + */ + @throws[IllegalStateException] + def read(): T = { + readLatch.tryAcquire() + elems(readIdx.getAndIncrement() % capacity) + } + + /** + * Reads the value at the offset of the current read index position, + * accounting for loops in the buffer. An [[IllegalStateException]] will be + * thrown if the [[offset]] is too large for the state of the buffer. + * + * @note + * Calling this method does not modify the read index position or the state + * of the buffer. + * + * @param offset + * The logical index in the buffer to read from + * + * @return + * The element at logical index [[offset]] + */ + @throws[IllegalStateException] + def read(offset: Int): T = { + if (offset >= size) + throw new IllegalStateException( + s"Read offset '$offset' was >= buffer size of '$size'" + ) + elems((readIdx.get() + offset) % capacity) + } + + /** + * Set the value at the current write index, modifying the state of the + * buffer. + */ + def write(value: T): Unit = { + elems(writeIdx.incrementAndGet() % capacity) = value + if (writeIdx.get() - readIdx.get() >= capacity) { + readIdx.incrementAndGet() + } + readLatch.release() + } + + def write(offset: Int, value: T): Unit = { + if (offset >= size) + throw new IllegalStateException( + "Cannot write to an offset that hasn't been written to" + ) + elems((readIdx.get() + offset) % capacity) = value + } + + /** + * Write a sequence of values to the buffer, modifying the state of the + * buffer. + * @param values + * The values to be written to the buffer + */ + @varargs + def write(values: T*): Unit = { + values.foreach(write) + } + + /** + * @inheritdoc + * + * @note + * This method does not modify the underlying buffer state + */ + def apply(i: Int): T = read(i) + + /** + * @inheritdoc + * @note + * This method does not modify the underlying buffer state + */ + override def view: IndexedSeqView[T] = new IndexedSeqView[T] { + def length: Int = self.count + def apply(i: Int): T = self(i) + } + + /** @inheritdoc */ + override def knownSize: Int = count + + /** @inheritdoc */ + override def className = "ConcurrentCircularBuffer" + + /** @inheritdoc */ + override val iterableFactory: IterableFactory[ConcurrentCircularBuffer] = + new ConcurrentCircularBufferFactory(capacity) + + /** + * This method does not modify this underlying [[ConcurrentCircularBuffer]], + * but will return a copy with the new element appended. + * + * @param elem + * The element to append + * @tparam B + * The type of [[ConcurrentCircularBuffer]] element + * @return + * A deep copy of this buffer with a given element appended + */ + @`inline` def :+[B >: T](elem: B): ConcurrentCircularBuffer[B] = appended( + elem + ) + + private[this] def appended[B >: T](elem: B): ConcurrentCircularBuffer[B] = { + val newElems = Array.ofDim[Any](capacity) + Array.copy(elems, 0, newElems, 0, capacity) + + val cb = new ConcurrentCircularBuffer[Any]( + capacity, + elems = newElems, + readIdx = self.readIdx.get(), + writeIdx = self.writeIdx.get() + ) + val ret = cb.asInstanceOf[ConcurrentCircularBuffer[B]] + ret.write(elem) + ret + } + + /** + * @inheritdoc + * @note + * This method does not modify the underlying buffer state + */ + def iterator: Iterator[T] = view.iterator + +} + +private[timeseries] class ConcurrentCircularBufferFactory(capacity: Int) + extends IterableFactory[ConcurrentCircularBuffer] { + + def from[A](source: IterableOnce[A]): ConcurrentCircularBuffer[A] = + source match { + case cb: ConcurrentCircularBuffer[A] if cb.capacity == capacity => cb + case _ => (newBuilder[A] ++= source).result() + } + + def empty[A]: ConcurrentCircularBuffer[A] = + new ConcurrentCircularBuffer[Any](capacity) + .asInstanceOf[ConcurrentCircularBuffer[A]] + + def newBuilder[A]: mutable.Builder[A, ConcurrentCircularBuffer[A]] = + new mutable.ImmutableBuilder[A, ConcurrentCircularBuffer[A]](empty) { + def addOne(elem: A): this.type = { elems = elems :+ elem; this } + } + +} diff --git a/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/SynchronizedCircularBuffer.scala b/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/SynchronizedCircularBuffer.scala new file mode 100644 index 0000000..ad214b4 --- /dev/null +++ b/timeseries/timeseries4s/timeseries-core/src/main/scala/dev/enbnt/timeseries/util/SynchronizedCircularBuffer.scala @@ -0,0 +1,250 @@ +package dev.enbnt.timeseries.util + +import scala.annotation.varargs +import scala.collection.IndexedSeqView +import scala.collection.IterableFactory +import scala.collection.IterableFactoryDefaults +import scala.collection.IterableOps +import scala.collection.StrictOptimizedIterableOps +import scala.collection.mutable +import scala.reflect.ClassTag + +/** + * A mutable bounded buffer implementation, which behaves as a circle/ring for + * sequential read/write access. The underlying buffer uses two pointers — + * a read-index and a write-index, which are used to determine the logical + * beginning, ending, and size of the unconsumed buffer. + * + * @example + * // format: off + * {{{ + * val ring = new SynchronizedCircularBuffer[String](2) // ring: [_, _] read: 0 write: 0 size: 0 + * ring.write("hello") // ring: ["hello", _] read: 0 write: 1 size: 1 + * ring.write("world") // ring: ["hello", "world"] read: 0 write: 0 size: 2 + * + * ring.read() // "hello" - ring: [_, "world"] read: 1 write: 0 size: 1 + * ring.read() // "world" - ring: [_, _] read: 0 write: 0 size: 0 + * + * // calling ring.read() again would throw an exception + * + * ring.write("good") // ring: ["good", _] read: 0 write: 1 size: 1 + * ring.write("bye") // ring: ["good", "bye"] read: 0 write: 0 size: 2 + * ring.write("sunshine") // ring: ["sunshine", "bye"] read: 1 write: 1 size: 2 + * + * ring.read() // "bye" - ring: ["sunshine", _] read: 0 write: 1 size: 1 + * ring.read() // "sunshine" - ring: [_, _] read: 1 write: 1 size: 0 + * }}} + * + * @note + * This implementation IS thread safe. + * + * @note + * The only methods that modify the the buffer's indices are [[read()]] and + * [[write()]]. Any other methods which access data in the buffer via an + * offset will not mutate the underlying buffer state. + * + * This is a **VERY IMPORTANT** detail, as access via iterators or collection + * methods (map, flatMap) will act over a view of the buffer data, but not + * modify its state directly. This allows the data in the buffer to be accessed + * *without* emptying or mutating the state of the buffer as a result. + * + * @see + * [[https://en.wikipedia.org/wiki/Circular_buffer]] + * @see + * [[https://docs.scala-lang.org/overviews/core/custom-collections.html]] + * + * @param capacity + * The maximum allowed size of the underlying buffer. + * @param elems + * The defined buffer elements. + * @param readIdx + * The starting offset of the read index + * @param writeIdx + * The starting offset of the write index + * @tparam T + * The type of elements of contained within the buffer + */ +private[timeseries] class SynchronizedCircularBuffer[T: ClassTag] private ( + val capacity: Int, + elems: Array[T], + var readIdx: Int, + var writeIdx: Int +) extends Iterable[T] + with IterableOps[T, SynchronizedCircularBuffer, SynchronizedCircularBuffer[ + T + ]] + with IterableFactoryDefaults[T, SynchronizedCircularBuffer] + with StrictOptimizedIterableOps[ + T, + SynchronizedCircularBuffer, + SynchronizedCircularBuffer[T] + ] { + self => + + require( + capacity > 0, + s"SynchronizedCircularBuffer capacity must be > 0, but received '$capacity'" + ) + + private[this] def count: Int = synchronized { + math.min(writeIdx - readIdx + 1, capacity) + } + + def this(capacity: Int) = + this(capacity, elems = Array.ofDim[T](capacity), readIdx = 0, writeIdx = -1) + + /** + * Consume the value at the current read index, modifying the state of the + * buffer. An [[IllegalStateException]] will be thrown if the buffer is empty + * when a read is attempted. + */ + @throws[IllegalStateException] + def read(): T = synchronized { + if (isEmpty) + throw new IllegalStateException("Cannot read ahead of written value") + val v = elems(readIdx % capacity) + readIdx += 1 + v + } + + /** + * Reads the value at the offset of the current read index position, + * accounting for loops in the buffer. An [[IllegalStateException]] will be + * thrown if the [[offset]] is too large for the state of the buffer. + * + * @note + * Calling this method does not modify the read index position or the state + * of the buffer. + * + * @param offset + * The logical index in the buffer to read from + * + * @return + * The element at logical index [[offset]] + */ + @throws[IllegalStateException] + def read(offset: Int): T = synchronized { + if (offset >= size) + throw new IllegalStateException( + s"Read offset '$offset' was >= buffer size of '$size'" + ) + elems((readIdx + offset) % capacity) + } + + /** + * Set the value at the current write index, modifying the state of the + * buffer. + */ + def write(value: T): Unit = synchronized { + writeIdx += 1 + elems(writeIdx % capacity) = value + if (writeIdx - readIdx == capacity) { + readIdx += 1 + } + } + + def write(offset: Int, value: T): Unit = synchronized { + if (offset >= size) + throw new IllegalStateException( + "Cannot write to an offset that hasn't been written to" + ) + elems((readIdx + offset) % capacity) = value + } + + /** + * Write a sequence of values to the buffer, modifying the state of the + * buffer. + * @param values + * The values to be written to the buffer + */ + @varargs + def write(values: T*): Unit = synchronized { + values.foreach(write) + } + + /** + * @inheritdoc + * + * @note + * This method does not modify the underlying buffer state + */ + def apply(i: Int): T = read(i) + + /** + * @inheritdoc + * @note + * This method does not modify the underlying buffer state + */ + override def view: IndexedSeqView[T] = new IndexedSeqView[T] { + def length: Int = self.count + def apply(i: Int): T = self(i) + } + + /** @inheritdoc */ + override def knownSize: Int = count + + /** @inheritdoc */ + override def className = "SynchronizedCircularBuffer" + + /** @inheritdoc */ + override val iterableFactory: IterableFactory[SynchronizedCircularBuffer] = + new SynchronizedCircularBufferFactory(capacity) + + /** + * This method does not modify this underlying [[SynchronizedCircularBuffer]], + * but will return a copy with the new element appended. + * + * @param elem + * The element to append + * @tparam B + * The type of [[SynchronizedCircularBuffer]] element + * @return + * A deep copy of this buffer with a given element appended + */ + @`inline` def :+[B >: T](elem: B): SynchronizedCircularBuffer[B] = appended( + elem + ) + + private[this] def appended[B >: T](elem: B): SynchronizedCircularBuffer[B] = { + val newElems = Array.ofDim[Any](capacity) + Array.copy(elems, 0, newElems, 0, capacity) + + val cb = new SynchronizedCircularBuffer[Any]( + capacity, + elems = newElems, + readIdx = self.readIdx, + writeIdx = self.writeIdx + ) + val ret = cb.asInstanceOf[SynchronizedCircularBuffer[B]] + ret.write(elem) + ret + } + + /** + * @inheritdoc + * @note + * This method does not modify the underlying buffer state + */ + def iterator: Iterator[T] = view.iterator + +} + +private[timeseries] class SynchronizedCircularBufferFactory(capacity: Int) + extends IterableFactory[SynchronizedCircularBuffer] { + + def from[A](source: IterableOnce[A]): SynchronizedCircularBuffer[A] = + source match { + case cb: SynchronizedCircularBuffer[A] if cb.capacity == capacity => cb + case _ => (newBuilder[A] ++= source).result() + } + + def empty[A]: SynchronizedCircularBuffer[A] = + new SynchronizedCircularBuffer[Any](capacity) + .asInstanceOf[SynchronizedCircularBuffer[A]] + + def newBuilder[A]: mutable.Builder[A, SynchronizedCircularBuffer[A]] = + new mutable.ImmutableBuilder[A, SynchronizedCircularBuffer[A]](empty) { + def addOne(elem: A): this.type = { elems = elems :+ elem; this } + } + +}