Skip to content

Commit

Permalink
Allow for pubs to be non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniomika committed Oct 3, 2024
1 parent 90d6149 commit d92d74f
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 76 deletions.
2 changes: 2 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"iter"
"log/slog"
"sync"
"time"

Expand All @@ -24,6 +25,7 @@ type Broker interface {

type BaseBroker struct {
Channels *syncmap.Map[string, *Channel]
Logger *slog.Logger
}

func (b *BaseBroker) Cleanup() {
Expand Down
6 changes: 3 additions & 3 deletions cmd/authorized_keys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func PubSubMiddleware(broker pubsub.PubSub, logger *slog.Logger) wish.Middleware

clientID := uuid.NewString()

err := errors.Join(broker.Sub(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "keepalive"))
err := errors.Join(broker.Sub(sesh.Context(), clientID, sesh, chans, strings.ToLower(args[len(args)-1]) == "keepalive"))
if err != nil {
logger.Error("error during pub", slog.Any("error", err), slog.String("client", clientID))
}
Expand All @@ -71,7 +71,7 @@ func PubSubMiddleware(broker pubsub.PubSub, logger *slog.Logger) wish.Middleware

clientID := uuid.NewString()

err := errors.Join(broker.Pub(sesh.Context(), clientID, sesh, chans))
err := errors.Join(broker.Pub(sesh.Context(), clientID, sesh, chans, strings.ToLower(args[len(args)-1]) == "blockwrite"))
if err != nil {
logger.Error("error during pub", slog.Any("error", err), slog.String("client", clientID))
}
Expand All @@ -84,7 +84,7 @@ func PubSubMiddleware(broker pubsub.PubSub, logger *slog.Logger) wish.Middleware

clientID := uuid.NewString()

err := errors.Join(broker.Pipe(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "replay"))
err := errors.Join(broker.Pipe(sesh.Context(), clientID, sesh, chans, strings.ToLower(args[len(args)-1]) == "replay"))
if err != nil {
logger.Error(
"pipe error",
Expand Down
27 changes: 12 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,36 @@ go 1.23.1
require (
github.com/antoniomika/syncmap v1.0.0
github.com/charmbracelet/ssh v0.0.0-20240725163421-eb71b85b27aa
github.com/charmbracelet/wish v1.4.2
github.com/charmbracelet/wish v1.4.3
github.com/google/uuid v1.6.0
)

require (
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/bubbletea v0.27.0 // indirect
github.com/charmbracelet/bubbletea v1.1.1 // indirect
github.com/charmbracelet/keygen v0.5.1 // indirect
github.com/charmbracelet/lipgloss v0.12.1 // indirect
github.com/charmbracelet/lipgloss v0.13.0 // indirect
github.com/charmbracelet/log v0.4.0 // indirect
github.com/charmbracelet/x/ansi v0.1.4 // indirect
github.com/charmbracelet/x/ansi v0.3.2 // indirect
github.com/charmbracelet/x/conpty v0.1.0 // indirect
github.com/charmbracelet/x/errors v0.0.0-20240508181413-e8d8b6e2de86 // indirect
github.com/charmbracelet/x/input v0.1.3 // indirect
github.com/charmbracelet/x/term v0.1.1 // indirect
github.com/charmbracelet/x/errors v0.0.0-20240919170804-a4978c8e603a // indirect
github.com/charmbracelet/x/term v0.2.0 // indirect
github.com/charmbracelet/x/termios v0.1.0 // indirect
github.com/charmbracelet/x/windows v0.1.2 // indirect
github.com/creack/pty v1.1.21 // indirect
github.com/creack/pty v1.1.23 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
)
58 changes: 26 additions & 32 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,30 @@ github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEy
github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/charmbracelet/bubbletea v0.27.0 h1:Mznj+vvYuYagD9Pn2mY7fuelGvP0HAXtZYGgRBCbHvU=
github.com/charmbracelet/bubbletea v0.27.0/go.mod h1:5MdP9XH6MbQkgGhnlxUqCNmBXf9I74KRQ8HIidRxV1Y=
github.com/charmbracelet/bubbletea v1.1.1 h1:KJ2/DnmpfqFtDNVTvYZ6zpPFL9iRCRr0qqKOCvppbPY=
github.com/charmbracelet/bubbletea v1.1.1/go.mod h1:9Ogk0HrdbHolIKHdjfFpyXJmiCzGwy+FesYkZr7hYU4=
github.com/charmbracelet/keygen v0.5.1 h1:zBkkYPtmKDVTw+cwUyY6ZwGDhRxXkEp0Oxs9sqMLqxI=
github.com/charmbracelet/keygen v0.5.1/go.mod h1:zznJVmK/GWB6dAtjluqn2qsttiCBhA5MZSiwb80fcHw=
github.com/charmbracelet/lipgloss v0.12.1 h1:/gmzszl+pedQpjCOH+wFkZr/N90Snz40J/NR7A0zQcs=
github.com/charmbracelet/lipgloss v0.12.1/go.mod h1:V2CiwIuhx9S1S1ZlADfOj9HmxeMAORuz5izHb0zGbB8=
github.com/charmbracelet/lipgloss v0.13.0 h1:4X3PPeoWEDCMvzDvGmTajSyYPcZM4+y8sCA/SsA3cjw=
github.com/charmbracelet/lipgloss v0.13.0/go.mod h1:nw4zy0SBX/F/eAO1cWdcvy6qnkDUxr8Lw7dvFrAIbbY=
github.com/charmbracelet/log v0.4.0 h1:G9bQAcx8rWA2T3pWvx7YtPTPwgqpk7D68BX21IRW8ZM=
github.com/charmbracelet/log v0.4.0/go.mod h1:63bXt/djrizTec0l11H20t8FDSvA4CRZJ1KH22MdptM=
github.com/charmbracelet/ssh v0.0.0-20240725163421-eb71b85b27aa h1:6rePgmsJguB6Z7Y55stsEVDlWFJoUpQvOX4mdnBjgx4=
github.com/charmbracelet/ssh v0.0.0-20240725163421-eb71b85b27aa/go.mod h1:LmMZag2g7ILMmWtDmU7dIlctUopwmb73KpPzj0ip1uk=
github.com/charmbracelet/wish v1.4.2 h1:H2BKXewugK9Al75wST9h0hpQ95h981RZ5rtimDpygPQ=
github.com/charmbracelet/wish v1.4.2/go.mod h1:3Bzq7qMU2LTvdaM61KrCnhrzGP92D/Ru7CasrpyZmzY=
github.com/charmbracelet/x/ansi v0.1.4 h1:IEU3D6+dWwPSgZ6HBH+v6oUuZ/nVawMiWj5831KfiLM=
github.com/charmbracelet/x/ansi v0.1.4/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/charmbracelet/wish v1.4.3 h1:7FvNLoPGqiT7EdjQP4+XuvM1Hrnx9DyknilbD+Okx1s=
github.com/charmbracelet/wish v1.4.3/go.mod h1:hVgmhwhd52fLmO6m5AkREUMZYqQ0qmIJQDMe3HsNPmU=
github.com/charmbracelet/x/ansi v0.3.2 h1:wsEwgAN+C9U06l9dCVMX0/L3x7ptvY1qmjMwyfE6USY=
github.com/charmbracelet/x/ansi v0.3.2/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/charmbracelet/x/conpty v0.1.0 h1:4zc8KaIcbiL4mghEON8D72agYtSeIgq8FSThSPQIb+U=
github.com/charmbracelet/x/conpty v0.1.0/go.mod h1:rMFsDJoDwVmiYM10aD4bH2XiRgwI7NYJtQgl5yskjEQ=
github.com/charmbracelet/x/errors v0.0.0-20240508181413-e8d8b6e2de86 h1:JSt3B+U9iqk37QUU2Rvb6DSBYRLtWqFqfxf8l5hOZUA=
github.com/charmbracelet/x/errors v0.0.0-20240508181413-e8d8b6e2de86/go.mod h1:2P0UgXMEa6TsToMSuFqKFQR+fZTO9CNGUNokkPatT/0=
github.com/charmbracelet/x/input v0.1.3 h1:oy4TMhyGQsYs/WWJwu1ELUMFnjiUAXwtDf048fHbCkg=
github.com/charmbracelet/x/input v0.1.3/go.mod h1:1gaCOyw1KI9e2j00j/BBZ4ErzRZqa05w0Ghn83yIhKU=
github.com/charmbracelet/x/term v0.1.1 h1:3cosVAiPOig+EV4X9U+3LDgtwwAoEzJjNdwbXDjF6yI=
github.com/charmbracelet/x/term v0.1.1/go.mod h1:wB1fHt5ECsu3mXYusyzcngVWWlu1KKUmmLhfgr/Flxw=
github.com/charmbracelet/x/errors v0.0.0-20240919170804-a4978c8e603a h1:IlWNrDYRP6lyttqsFHDhmo8NXggMwBuhvvCP+Wmb2a8=
github.com/charmbracelet/x/errors v0.0.0-20240919170804-a4978c8e603a/go.mod h1:2P0UgXMEa6TsToMSuFqKFQR+fZTO9CNGUNokkPatT/0=
github.com/charmbracelet/x/term v0.2.0 h1:cNB9Ot9q8I711MyZ7myUR5HFWL/lc3OpU8jZ4hwm0x0=
github.com/charmbracelet/x/term v0.2.0/go.mod h1:GVxgxAbjUrmpvIINHIQnJJKpMlHiZ4cktEQCN6GWyF0=
github.com/charmbracelet/x/termios v0.1.0 h1:y4rjAHeFksBAfGbkRDmVinMg7x7DELIGAFbdNvxg97k=
github.com/charmbracelet/x/termios v0.1.0/go.mod h1:H/EVv/KRnrYjz+fCYa9bsKdqF3S8ouDK0AZEbG7r+/U=
github.com/charmbracelet/x/windows v0.1.2 h1:Iumiwq2G+BRmgoayww/qfcvof7W/3uLoelhxojXlRWg=
github.com/charmbracelet/x/windows v0.1.2/go.mod h1:GLEO/l+lizvFDBPLIOk+49gdX49L9YWMB5t+DZd0jkQ=
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/creack/pty v1.1.23 h1:4M6+isWdcStXEf15G/RbrMPOQj1dZ7HPZCGwE4kOeP0=
github.com/creack/pty v1.1.23/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4=
Expand All @@ -46,8 +42,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4=
github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo=
github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA=
Expand All @@ -61,21 +57,19 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 3 additions & 2 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewMulticast(logger *slog.Logger) *Multicast {
Logger: logger,
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
Logger: logger.With(slog.Bool("broker", true)),
},
}
}
Expand Down Expand Up @@ -72,8 +73,8 @@ func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, chann
return p.connect(ctx, ID, rw, channels, ChannelDirectionInputOutput, false, replay, false)
}

func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel) error {
return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, true, false, false))
func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error {
return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, blockWrite, false, false))
}

func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
Expand Down
29 changes: 6 additions & 23 deletions multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"log/slog"
"sync"
"testing"

"github.com/antoniomika/syncmap"
)

type Buffer struct {
Expand Down Expand Up @@ -40,12 +38,7 @@ func TestMulticastSubBlock(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &Multicast{
Logger: slog.Default(),
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
cast := NewMulticast(slog.Default())

var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -63,7 +56,7 @@ func TestMulticastSubBlock(t *testing.T) {

go func() {
orderActual += "pub-"
fmt.Println(cast.Pub(context.TODO(), "2", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}))
fmt.Println(cast.Pub(context.TODO(), "2", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
wg.Done()
}()

Expand All @@ -85,12 +78,7 @@ func TestMulticastPubBlock(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &Multicast{
Logger: slog.Default(),
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
cast := NewMulticast(slog.Default())

var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -100,7 +88,7 @@ func TestMulticastPubBlock(t *testing.T) {
go func() {
orderActual += "pub-"
syncer <- 0
fmt.Println(cast.Pub(context.TODO(), "1", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}))
fmt.Println(cast.Pub(context.TODO(), "1", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
wg.Done()
}()

Expand Down Expand Up @@ -131,12 +119,7 @@ func TestMulticastMultSubs(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &Multicast{
Logger: slog.Default(),
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
cast := NewMulticast(slog.Default())

var wg sync.WaitGroup
wg.Add(3)
Expand All @@ -163,7 +146,7 @@ func TestMulticastMultSubs(t *testing.T) {

go func() {
orderActual += "pub-"
fmt.Println(cast.Pub(context.TODO(), "3", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}))
fmt.Println(cast.Pub(context.TODO(), "3", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
wg.Done()
}()

Expand Down
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ type PubSub interface {
GetPipes() iter.Seq2[string, *Client]
Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error)
Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error
Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel) error
Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error
}

0 comments on commit d92d74f

Please sign in to comment.