Skip to content

Commit

Permalink
Exposing Republish field for streams CRD
Browse files Browse the repository at this point in the history
Co-authored-by: Lesley Funwie <[email protected]>
Co-authored-by: Joseph Woodward <[email protected]>
  • Loading branch information
3 people committed Oct 12, 2022
1 parent eeb57ee commit 7f16d46
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
/nats-boot-config
/nats-boot-config.docker
/tools
/.idea
7 changes: 7 additions & 0 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
return nil
})

if spec.Republish != nil {
opts = append(opts, jsm.Republish(&jsmapi.SubjectMapping{
Source: spec.Republish.Source,
Destination: spec.Republish.Destination,
}))
}

_, err = c.NewStream(ctx, spec.Name, opts)
return err
}
Expand Down
10 changes: 10 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ spec:
description: Name of the account to which the Stream belongs.
type: string
pattern: '^[^.*>]*$'
republish:
description: Republish configuration of the stream.
type: object
properties:
destination:
type: string
description: Messages will be additionally published to that subject.
source:
type: string
description: Messages will be published from that subject to the destination subject.
status:
type: object
properties:
Expand Down
52 changes: 29 additions & 23 deletions pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,30 @@ func (s *Stream) GetSpec() interface{} {

// StreamSpec is the spec for a Stream resource
type StreamSpec struct {
Account string `json:"account"`
Creds string `json:"creds"`
Description string `json:"description"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
Account string `json:"account"`
Creds string `json:"creds"`
Description string `json:"description"`
Discard string `json:"discard"`
DuplicateWindow string `json:"duplicateWindow"`
MaxAge string `json:"maxAge"`
MaxBytes int `json:"maxBytes"`
MaxConsumers int `json:"maxConsumers"`
MaxMsgs int `json:"maxMsgs"`
MaxMsgSize int `json:"maxMsgSize"`
MaxMsgsPerSubject int `json:"maxMsgsPerSubject"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
NoAck bool `json:"noAck"`
Placement *StreamPlacement `json:"placement"`
Replicas int `json:"replicas"`
Republish *StreamSubjectMapping `json:"republish"`
Retention string `json:"retention"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
Subjects []string `json:"subjects"`
TLS TLS `json:"tls"`
}

type StreamPlacement struct {
Expand All @@ -62,6 +63,11 @@ type StreamSource struct {
ExternalDeliverPrefix string `json:"externalDeliverPrefix"`
}

type StreamSubjectMapping struct {
Source string `json:"source"`
Destination string `json:"destination"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// StreamList is a list of Stream resources
Expand Down

0 comments on commit 7f16d46

Please sign in to comment.