Skip to content

Commit

Permalink
Cherry-picks for 2.10.26-RC.1 (#6462)
Browse files Browse the repository at this point in the history
Includes the following:

- #6406
- #6412
- #6408
- #6416
- #6425
- #6424
- #6438
- #6439
- #6446
- #6447
- #6448
- #6449
- #6450
- #6451
- #6452
- #6453
- #6456
- #6458
- #6457
- #6459
- #6460
- #6461

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Feb 6, 2025
2 parents 006039e + 84474c6 commit 3aa13a9
Show file tree
Hide file tree
Showing 30 changed files with 1,010 additions and 245 deletions.
39 changes: 8 additions & 31 deletions .github/actions/nightly-release/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@ name: Nightly Docker Releaser
description: Builds nightly docker images

inputs:
go:
description: The version of go to build with
required: true

label:
description: The label to use for built images
required: true

hub_username:
description: Docker hub username
required: true
Expand All @@ -25,35 +17,20 @@ inputs:
runs:
using: composite
steps:
- name: Log in to Docker Hub
shell: bash
run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}"

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "${{ inputs.go }}"
go-version: "stable"

- name: goreleaser
- name: Build and push Docker images
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 5742e2a039330cbb23ebf35f046f814d4c6ff811 = tag v5
uses: goreleaser/goreleaser-action@5742e2a039330cbb23ebf35f046f814d4c6ff811
with:
workdir: "${{ inputs.workdir }}"
version: latest
args: release --snapshot --config .goreleaser-nightly.yml

- name: images
shell: bash
run: docker images

- name: docker_login
shell: bash
run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}"

- name: docker_push
shell: bash
run: |
NDATE=$(date +%Y%m%d)
docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}-${NDATE}
docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}
docker push synadia/nats-server:${{ inputs.label }}-${NDATE}
docker push synadia/nats-server:${{ inputs.label }}
version: ~> v2
args: release --skip=announce,validate --config .goreleaser-nightly.yml
6 changes: 3 additions & 3 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
workflow_dispatch:
inputs:
target:
description: "Override image branch (optional)"
description: "Override source branch (optional)"
type: string
required: false

Expand All @@ -19,11 +19,11 @@ jobs:
with:
path: src/github.com/nats-io/nats-server
ref: ${{ inputs.target || 'main' }}
fetch-depth: 0
fetch-tags: true

- uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release
with:
go: "1.21"
workdir: src/github.com/nats-io/nats-server
label: nightly
hub_username: "${{ secrets.DOCKER_USERNAME }}"
hub_password: "${{ secrets.DOCKER_PASSWORD }}"
13 changes: 8 additions & 5 deletions .goreleaser-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ builds:
goarch:
- amd64

release:
disable: true

dockers:
- goos: linux
goarch: amd64
skip_push: true
dockerfile: docker/Dockerfile.nightly
skip_push: false
build_flag_templates:
- '--build-arg=VERSION={{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
- '--build-arg=VERSION={{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
image_templates:
- synadia/nats-server:{{.Version}}
- synadia/nats-server:{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}
- synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}
- synadia/nats-server:{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}
extra_files:
- docker/nats-server.conf

Expand All @@ -32,4 +35,4 @@ checksum:
algorithm: sha256

snapshot:
name_template: '{{ if index .Env "IMAGE_NAME" }}{{ .Env.IMAGE_NAME }}{{ else if not (eq .Branch "main" "dev" "") }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
version_template: '{{ if ne .Branch "main" }}{{ replace .Branch "/" "-" }}{{ else }}nightly{{ end }}-{{ time "20060102" }}'
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.23.5"
- "1.22.11"
- "1.23.6"
- "1.22.12"

go_import_path: github.com/nats-io/nats-server

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.nightly
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.21-alpine AS builder
FROM golang:alpine AS builder

ARG VERSION="nightly"

Expand Down
26 changes: 18 additions & 8 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4129,9 +4129,20 @@ func (c *client) setHeader(key, value string, msg []byte) []byte {
return bb.Bytes()
}

// Will return the value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
// Will return a copy of the value for the header denoted by key or nil if it does not exist.
// If you know that it is safe to refer to the underlying hdr slice for the period that the
// return value is used, then sliceHeader() will be faster.
func getHeader(key string, hdr []byte) []byte {
v := sliceHeader(key, hdr)
if v == nil {
return nil
}
return append(make([]byte, 0, len(v)), v...)
}

// Will return the sliced value for the header denoted by key or nil if it does not exists.
// This function ignores errors and tries to achieve speed and no additional allocations.
func sliceHeader(key string, hdr []byte) []byte {
if len(hdr) == 0 {
return nil
}
Expand All @@ -4156,15 +4167,14 @@ func getHeader(key string, hdr []byte) []byte {
index++
}
// Collect together the rest of the value until we hit a CRLF.
var value []byte
start := index
for index < hdrLen {
if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' {
break
}
value = append(value, hdr[index])
index++
}
return value
return hdr[start:index:index]
}

// For bytes.HasPrefix below.
Expand Down Expand Up @@ -4278,7 +4288,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var ci *ClientInfo
if hadPrevSi && c.pa.hdr >= 0 {
var cis ClientInfo
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
if err := json.Unmarshal(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis
ci.Service = acc.Name
// Check if we are moving into a share details account from a non-shared
Expand All @@ -4287,7 +4297,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.addServerAndClusterInfo(ci)
}
}
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
} else if c.kind != LEAF || c.pa.hdr < 0 || len(sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
// If we did not share but the imports destination is the system account add in the server and cluster info.
if !share && isSysImport {
Expand Down Expand Up @@ -4846,7 +4856,7 @@ func (c *client) checkLeafClientInfoHeader(msg []byte) (dmsg []byte, setHdr bool
if c.pa.hdr < 0 || len(msg) < c.pa.hdr {
return msg, false
}
cir := getHeader(ClientInfoHdr, msg[:c.pa.hdr])
cir := sliceHeader(ClientInfoHdr, msg[:c.pa.hdr])
if len(cir) == 0 {
return msg, false
}
Expand Down
23 changes: 23 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2964,6 +2964,29 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
}
}

func TestSliceHeader(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

hdr = genHeader(hdr, "a", "1")
hdr = genHeader(hdr, JSExpectedStream, "my-stream")
hdr = genHeader(hdr, JSExpectedLastSeq, "22")
hdr = genHeader(hdr, "b", "2")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
hdr = genHeader(hdr, JSExpectedLastMsgId, "1")
hdr = genHeader(hdr, "c", "3")

sliced := sliceHeader(JSExpectedLastSubjSeq, hdr)
copied := getHeader(JSExpectedLastSubjSeq, hdr)

require_NotNil(t, sliced)
require_Equal(t, cap(sliced), 2)

require_NotNil(t, copied)
require_Equal(t, cap(copied), len(copied))

require_True(t, bytes.Equal(sliced, copied))
}

func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = 1024 * 1024 * 140 // 140MB
Expand Down
26 changes: 16 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type ConsumerInfo struct {
}

type ConsumerConfig struct {
// Durable is deprecated. All consumers should have names, picked by clients.
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Expand Down Expand Up @@ -1583,6 +1582,12 @@ var (
// deleteNotActive must only be called from time.AfterFunc or in its own
// goroutine, as it can block on clean-up.
func (o *consumer) deleteNotActive() {
// Take a copy of these when the goroutine starts, mostly it avoids a
// race condition with tests that modify these consts, such as
// TestJetStreamClusterGhostEphemeralsAfterRestart.
cnaMax := consumerNotActiveMaxInterval
cnaStart := consumerNotActiveStartInterval

o.mu.Lock()
if o.mset == nil {
o.mu.Unlock()
Expand Down Expand Up @@ -1626,10 +1631,10 @@ func (o *consumer) deleteNotActive() {
if o.srv != nil {
qch = o.srv.quitCh
}
if o.js != nil {
cqch = o.js.clusterQuitC()
}
o.mu.Unlock()
if js != nil {
cqch = js.clusterQuitC()
}

// Useful for pprof.
setGoRoutineLabels(pprofLabels{
Expand Down Expand Up @@ -1663,8 +1668,8 @@ func (o *consumer) deleteNotActive() {
if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
jitter := time.Duration(rand.Int63n(int64(cnaStart)))
interval := cnaStart + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
Expand All @@ -1686,7 +1691,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
if interval < cnaMax {
interval *= 2
ticker.Reset(interval)
}
Expand Down Expand Up @@ -1859,9 +1864,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if cfg.FlowControl != ncfg.FlowControl {
return errors.New("flow control can not be updated")
}
if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Deliver Subject is conditional on if its bound.
if cfg.DeliverSubject != ncfg.DeliverSubject {
Expand All @@ -1876,6 +1878,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
}
}

if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Check if BackOff is defined, MaxDeliver is within range.
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
Expand Down
34 changes: 34 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,14 @@ func (s *Server) initEventTracking() {
optz := &ExpvarzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.expvarz(optz), nil })
},
"IPQUEUESZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &IpqueueszEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Ipqueuesz(&optz.IpqueueszOptions), nil })
},
"RAFTZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &RaftzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (any, error) { return s.Raftz(&optz.RaftzOptions), nil })
},
}
profilez := func(_ *subscription, c *client, _ *Account, _, rply string, rmsg []byte) {
hdr, msg := c.msgParts(rmsg)
Expand Down Expand Up @@ -1921,6 +1929,18 @@ type ExpvarzEventOptions struct {
EventFilterOptions
}

// In the context of system events, IpqueueszEventOptions are options passed to Ipqueuesz
type IpqueueszEventOptions struct {
EventFilterOptions
IpqueueszOptions
}

// In the context of system events, RaftzEventOptions are options passed to Raftz
type RaftzEventOptions struct {
EventFilterOptions
RaftzOptions
}

// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
Expand Down Expand Up @@ -2043,6 +2063,20 @@ type ServerAPIExpvarzResponse struct {
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIpqueueszResponse is the response type for ipqueuesz
type ServerAPIpqueueszResponse struct {
Server *ServerInfo `json:"server"`
Data *IpqueueszStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// ServerAPIRaftzResponse is the response type for raftz
type ServerAPIRaftzResponse struct {
Server *ServerInfo `json:"server"`
Data *RaftzStatus `json:"data,omitempty"`
Error *ApiError `json:"error,omitempty"`
}

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
if !s.EventsEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new initial subscription for the eventing system.
checkExpectedSubs(t, 58, sa)
checkExpectedSubs(t, 62, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down
Loading

0 comments on commit 3aa13a9

Please sign in to comment.