Skip to content

Commit

Permalink
Add initial object store types. Update Stream config and reorg to kee…
Browse files Browse the repository at this point in the history
…p in-line with nats client.
  • Loading branch information
samuelattwood committed Jan 7, 2025
1 parent c690327 commit ec7aca3
Show file tree
Hide file tree
Showing 33 changed files with 1,755 additions and 598 deletions.
14 changes: 7 additions & 7 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
return nil
})

if spec.Republish != nil {
if spec.RePublish != nil {
opts = append(opts, jsm.Republish(&jsmapi.RePublish{
Source: spec.Republish.Source,
Destination: spec.Republish.Destination,
Source: spec.RePublish.Source,
Destination: spec.RePublish.Destination,
}))
}

Expand Down Expand Up @@ -495,11 +495,11 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
FirstSeq: spec.FirstSequence,
SubjectTransform: subjectTransform,
}
if spec.Republish != nil {
if spec.RePublish != nil {
config.RePublish = &jsmapi.RePublish{
Source: spec.Republish.Source,
Destination: spec.Republish.Destination,
HeadersOnly: spec.Republish.HeadersOnly,
Source: spec.RePublish.Source,
Destination: spec.RePublish.Destination,
HeadersOnly: spec.RePublish.HeadersOnly,
}
}
if spec.Mirror != nil {
Expand Down
235 changes: 126 additions & 109 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ spec:
type: string
pattern: '^[^.*>]*$'
minLength: 1
description:
description: The description of the stream.
type: string
subjects:
description: A list of subjects to consume, supports wildcards.
type: array
Expand Down Expand Up @@ -58,10 +61,25 @@ spec:
type: integer
minimum: -1
default: -1
discard:
description: When a Stream reach it's limits either old messages are deleted or new ones are denied.
type: string
enum:
- old
- new
default: old
discardPerSubject:
description: Allows to discard messages on a subject basis.
type: boolean
default: false
maxAge:
description: Maximum age of any message in the stream, expressed in Go's time.Duration format. Empty for unlimited.
type: string
default: ''
maxMsgsPerSubject:
description: The maximum of messages per subject.
type: integer
default: 0
maxMsgSize:
description: The largest message that will be accepted by the Stream. -1 for unlimited.
type: integer
Expand All @@ -83,23 +101,19 @@ spec:
description: Disables acknowledging messages that are received by the Stream.
type: boolean
default: false
discard:
description: When a Stream reach it's limits either old messages are deleted or new ones are denied.
type: string
enum:
- old
- new
default: old
duplicateWindow:
description: The duration window to track duplicate messages for.
type: string
description:
description: The description of the stream.
type: string
maxMsgsPerSubject:
description: The maximum of messages per subject.
type: integer
default: 0
placement:
description: A stream's placement.
type: object
properties:
cluster:
type: string
tags:
type: array
items:
type: string
mirror:
description: A stream mirror.
type: object
Expand Down Expand Up @@ -130,16 +144,6 @@ spec:
dest:
description: Destination subject.
type: string
placement:
description: A stream's placement.
type: object
properties:
cluster:
type: string
tags:
type: array
items:
type: string
sources:
description: A stream's sources.
type: array
Expand Down Expand Up @@ -172,58 +176,22 @@ spec:
dest:
description: Destination subject.
type: string
metadata:
description: Additional Stream metadata.
type: object
additionalProperties:
type: string
servers:
description: A list of servers for creating stream
type: array
items:
type: string
default: []
creds:
description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path.
type: string
default: ''
nkey:
description: NATS user NKey for connecting to servers.
type: string
default: ''
tls:
description: A client's TLS certs and keys.
type: object
properties:
clientCert:
description: A client's cert filepath. Should be mounted.
type: string
clientKey:
description: A client's key filepath. Should be mounted.
type: string
rootCas:
description: A list of filepaths to CAs. Should be mounted.
type: array
items:
type: string
account:
description: Name of the account to which the Stream belongs.
type: string
pattern: '^[^.*>]*$'
republish:
description: Republish configuration of the stream.
type: object
properties:
destination:
type: string
description: Messages will be additionally published to that subject.
source:
type: string
description: Messages will be published from that subject to the destination subject.
firstSequence:
description: Sequence number from which the Stream will start.
type: number
default: 0
sealed:
description: Seal an existing stream so no new messages can be added.
type: boolean
default: false
denyDelete:
description: When true, restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true.
type: boolean
default: false
denyPurge:
description: When true, restricts the ability to purge a stream via the API. Cannot be changed once set to true.
type: boolean
default: false
allowRollup:
description: When true, allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.
type: boolean
default: false
compression:
description: Stream specific compression.
type: string
Expand All @@ -232,6 +200,10 @@ spec:
- none
- ''
default: ''
firstSequence:
description: Sequence number from which the Stream will start.
type: number
default: 0
subjectTransform:
description: SubjectTransform is for applying a subject transform (to matching messages) when a new message is received
type: object
Expand All @@ -242,34 +214,79 @@ spec:
dest:
type: string
description: Destination subject to transform into
preventDelete:
description: When true, the managed Stream will not be deleted when the resource is deleted
type: boolean
default: false
preventUpdate:
description: When true, the managed Stream will not be updated when the resource is updated
type: boolean
default: false
republish:
description: Republish configuration of the stream.
type: object
properties:
destination:
type: string
description: Messages will be additionally published to that subject.
source:
type: string
description: Messages will be published from that subject to the destination subject.
allowDirect:
description: When true, allow higher performance, direct access to get individual messages
type: boolean
default: false
allowRollup:
description: When true, allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message.
type: boolean
default: false
denyDelete:
description: When true, restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true.
mirrorDirect:
description: When true, enables direct access to messages from the origin stream
type: boolean
default: false
denyPurge:
description: When true, restricts the ability to purge a stream via the API. Cannot be changed once set to true.
consumerLimits:
type: object
properties:
inactiveThreshold:
description: The duration of inactivity after which a consumer is considered inactive.
type: string
maxAckPending:
description: Maximum number of outstanding unacknowledged messages.
type: integer
metadata:
description: Additional Stream metadata.
type: object
additionalProperties:
type: string
account:
description: Name of the account to which the Stream belongs.
type: string
pattern: '^[^.*>]*$'
creds:
description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path.
type: string
default: ''
nkey:
description: NATS user NKey for connecting to servers.
type: string
default: ''
preventDelete:
description: When true, the managed Stream will not be deleted when the resource is deleted
type: boolean
default: false
discardPerSubject:
description: Allows to discard messages on a subject basis.
preventUpdate:
description: When true, the managed Stream will not be updated when the resource is updated
type: boolean
default: false
servers:
description: A list of servers for creating stream
type: array
items:
type: string
default: []
tls:
description: A client's TLS certs and keys.
type: object
properties:
clientCert:
description: A client's cert filepath. Should be mounted.
type: string
clientKey:
description: A client's key filepath. Should be mounted.
type: string
rootCas:
description: A list of filepaths to CAs. Should be mounted.
type: array
items:
type: string
status:
type: object
properties:
Expand Down Expand Up @@ -1040,11 +1057,11 @@ spec:
spec:
type: object
properties:
name:
description: A unique name for the Key/Value store.
bucket:
description: A unique name for the KV Bucket.
type: string
description:
description: The description of the Key/Value store.
description: The description of the KV Bucket.
type: string
maxValueSize:
description: The maximum size of a value in bytes.
Expand All @@ -1056,21 +1073,21 @@ spec:
description: The time expiry for keys.
type: string
maxBytes:
description: The maximum size of the Key/Value store in bytes.
description: The maximum size of the KV Bucket in bytes.
type: integer
storage:
description: The storage backend to use for the Key/Value store.
description: The storage backend to use for the KV Bucket.
type: string
enum:
- file
- memory
replicas:
description: The number of replicas to keep for the Key/Value store in clustered JetStream.
description: The number of replicas to keep for the KV Bucket in clustered JetStream.
type: integer
minimum: 1
maximum: 5
placement:
description: The Key/Value store placement via tags or cluster name.
description: The KV Bucket placement via tags or cluster name.
type: object
properties:
cluster:
Expand All @@ -1080,17 +1097,17 @@ spec:
items:
type: string
republish:
description: Republish configuration for the Key/Value store.
description: Republish configuration for the KV Bucket.
type: object
properties:
destination:
type: string
description: Messages will be additionally published to this subject after store.
description: Messages will be additionally published to this subject after Bucket.
source:
type: string
description: Messages will be published from this subject to the destination subject.
mirror:
description: A Key/Value store mirror.
description: A KV Bucket mirror.
type: object
properties:
name:
Expand Down Expand Up @@ -1120,10 +1137,10 @@ spec:
description: Destination subject.
type: string
compression:
description: Key/Value store compression.
description: KV Bucket compression.
type: boolean
sources:
description: A Key/Value store's sources.
description: A KV Bucket's sources.
type: array
items:
type: object
Expand Down Expand Up @@ -1218,9 +1235,9 @@ spec:
additionalPrinterColumns:
- name: State
type: string
description: The current state of the Key/Value store.
description: The current state of the KV Bucket.
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
- name: Key/Value Store Name
- name: KV Bucket Name
type: string
description: The name of the Key/Value store.
description: The name of the KV Bucket.
jsonPath: .spec.name
2 changes: 1 addition & 1 deletion internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {
// Set status to not false
// Set status to false
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, consumer); err != nil {
return fmt.Errorf("update ready condition: %w", err)
Expand Down
Loading

0 comments on commit ec7aca3

Please sign in to comment.