Skip to content

Commit 1517e53

Browse files
authored
Add persist_mode to StreamConfig (#773)
Signed-off-by: Casper Beyer <[email protected]>
1 parent 95e7fac commit 1517e53

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

nats/src/nats/js/api.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,28 @@ class StoreCompression(str, Enum):
237237
S2 = "s2"
238238

239239

240+
class PersistMode(str, Enum):
241+
"""
242+
PersistMode defines the consistency and durability guarantees for stream persistence.
243+
244+
See ADR-56 for details: https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-56.md
245+
246+
Currently only applicable to R1 (single replica) streams.
247+
Introduced in nats-server 2.12.0.
248+
"""
249+
250+
# DEFAULT represents the strongest consistency guarantee.
251+
# Uses synchronous writes with fsync for maximum durability.
252+
# Server does not store this value - it's the implied default when unset.
253+
DEFAULT = "default"
254+
255+
# ASYNC enables asynchronous flushing of data to disk.
256+
# Returns PubAck before disk persistence occurs, batching writes in memory.
257+
# Provides significantly improved performance at the cost of potential data loss
258+
# during infrastructure failures. Incompatible with batch publishing.
259+
ASYNC = "async"
260+
261+
240262
@dataclass
241263
class RePublish(Base):
242264
"""
@@ -317,6 +339,10 @@ class StreamConfig(Base):
317339
# Allow batched publishing. Introduced in nats-server 2.12.0.
318340
allow_batched: Optional[bool] = None
319341

342+
# Persistence mode for stream. Only applicable to R1 streams.
343+
# Introduced in nats-server 2.12.0.
344+
persist_mode: Optional[PersistMode] = None
345+
320346
# Metadata are user defined string key/value pairs.
321347
metadata: Optional[Dict[str, str]] = None
322348

nats/tests/test_js.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4743,6 +4743,48 @@ async def test_stream_allow_atomic(self):
47434743

47444744
await nc.close()
47454745

4746+
@async_test
4747+
async def test_stream_persist_mode(self):
4748+
nc = await nats.connect()
4749+
4750+
server_version = nc.connected_server_version
4751+
if server_version.major == 2 and server_version.minor < 12:
4752+
pytest.skip("persist_mode requires nats-server v2.12.0 or later")
4753+
4754+
js = nc.jetstream()
4755+
4756+
# Test setting async consistency model on R1 stream
4757+
await js.add_stream(
4758+
name="ASYNC",
4759+
subjects=["test"],
4760+
num_replicas=1,
4761+
persist_mode=nats.js.api.PersistMode.ASYNC,
4762+
)
4763+
sinfo = await js.stream_info("ASYNC")
4764+
assert sinfo.config.persist_mode == nats.js.api.PersistMode.ASYNC
4765+
4766+
# Test that default consistency model works
4767+
await js.add_stream(
4768+
name="DEFAULT_CONSISTENCY",
4769+
subjects=["foo"],
4770+
num_replicas=1,
4771+
persist_mode=nats.js.api.PersistMode.DEFAULT,
4772+
)
4773+
sinfo = await js.stream_info("DEFAULT_CONSISTENCY")
4774+
# Server doesn't store default value, so it may be None
4775+
assert sinfo.config.persist_mode in [None, nats.js.api.PersistMode.DEFAULT]
4776+
4777+
# Test that it defaults to None when not set
4778+
await js.add_stream(
4779+
name="UNSET_CONSISTENCY",
4780+
subjects=["bar"],
4781+
num_replicas=1,
4782+
)
4783+
sinfo = await js.stream_info("UNSET_CONSISTENCY")
4784+
assert sinfo.config.persist_mode in [None, nats.js.api.PersistMode.DEFAULT]
4785+
4786+
await nc.close()
4787+
47464788
@async_test
47474789
async def test_fetch_pull_subscribe_bind(self):
47484790
nc = NATS()

0 commit comments

Comments
 (0)