Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 40 additions & 23 deletions api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,75 @@ import (

type Controller struct {
server *mqtt.Server
client *lib.Client
store *lib.ClientStore
password string
}

func CreateController(server *mqtt.Server, client *lib.Client, password string) *Controller {
controller := &Controller{server: server, client: client, password: password}
return controller
func NewController(server *mqtt.Server, store *lib.ClientStore, password string) *Controller {
return &Controller{server: server, store: store, password: password}
}

func (c *Controller) RootHandler() http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
info, _ := json.Marshal(c.server.Info)
writer.Header().Set("Content-Type", "application/json")
writer.Write(info)
w.Header().Set("Content-Type", "application/json")
w.Write(info)
}
}

func (c *Controller) PublishHandler() http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
func (c *Controller) withAuthentication(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if c.password != "" {
_, password, ok := request.BasicAuth()
_, password, ok := r.BasicAuth()

if !ok {
writer.WriteHeader(http.StatusBadRequest)
io.WriteString(writer, "Missing basic auth")
w.WriteHeader(http.StatusBadRequest)
io.WriteString(w, "Missing basic auth")
return
}

if password != c.password {
writer.WriteHeader(http.StatusForbidden)
io.WriteString(writer, "Forbidden")
w.WriteHeader(http.StatusForbidden)
io.WriteString(w, "Forbidden")
return
}
}
next(w, r)
}
}

topic := request.URL.Query().Get("topic")
func (c *Controller) PublishHandler() http.HandlerFunc {
return c.withAuthentication(func(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
writer.WriteHeader(http.StatusBadRequest)
io.WriteString(writer, "Missing topic")
w.WriteHeader(http.StatusBadRequest)
io.WriteString(w, "Missing topic")
return
}

defer request.Body.Close()
message, err := io.ReadAll(request.Body)
defer r.Body.Close()
message, err := io.ReadAll(r.Body)
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
io.WriteString(writer, err.Error())
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
return
}

c.server.Log.Info("Publish message", "topic", topic)
c.server.Publish(topic, message, false, 0)
writer.Write(message)
}
w.Write(message)
})
}

func (c *Controller) DumpHandler() http.HandlerFunc {
return c.withAuthentication(func(w http.ResponseWriter, r *http.Request) {
data, err := c.store.Export()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, "failed to export")
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
})
}
24 changes: 14 additions & 10 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ func (b *Broker) Start(reg prometheus.Registerer) error {
metrics := lib.NewMetrics(reg)

// Create HTTP Client
client := &lib.Client{
Server: b.server,
ContentType: b.config.ContentType,
TopicHeader: b.config.TopicHeader,
Metrics: metrics,
}
httpClient := lib.NewHTTPClient(
b.config.ContentType,
b.config.TopicHeader,
b.config.AuthorizeURL,
metrics,
)

// Create the client store
clientStore := lib.NewClientStore(metrics)

// Setup lifecycle hook
lifecycleHook := &hooks.LifecycleHook{}
Expand All @@ -51,15 +54,15 @@ func (b *Broker) Start(reg prometheus.Registerer) error {
return fmt.Errorf("failed to add lifecycle hook: %w", err)
}

// Setup auth hook
authHook := &hooks.AuthHook{Client: client, URL: b.config.AuthorizeURL}
// Setup connect-authenticate, acl, disconnect hook
authHook := &hooks.SessionHook{HTTPClient: httpClient, Store: clientStore}
err = b.server.AddHook(authHook, nil)
if err != nil {
return fmt.Errorf("failed to add auth hook: %w", err)
}

// Setup publish hook
publishHook := &hooks.PublishHook{Client: client, Routes: b.config.Routes}
publishHook := &hooks.PublishHook{HTTPClient: httpClient, Routes: b.config.Routes, Store: clientStore}
err = b.server.AddHook(publishHook, nil)
if err != nil {
return fmt.Errorf("failed to add publish hook: %w", err)
Expand All @@ -84,11 +87,12 @@ func (b *Broker) Start(reg prometheus.Registerer) error {
go func() {
b.server.Log.Info("Starting API HTTP server", "addr", b.config.HTTPAddr)

controller := api.CreateController(b.server, client, b.config.APIPassword)
controller := api.NewController(b.server, clientStore, b.config.APIPassword)

mux := http.NewServeMux()
mux.HandleFunc("/", controller.RootHandler())
mux.HandleFunc("/publish", controller.PublishHandler())
mux.HandleFunc("/clients", controller.DumpHandler())

err := http.ListenAndServe(b.config.HTTPAddr, mux)
if err != nil {
Expand Down
49 changes: 0 additions & 49 deletions hooks/auth.go

This file was deleted.

8 changes: 5 additions & 3 deletions hooks/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

type PublishHook struct {
mqtt.HookBase
Client *lib.Client
DefaultURL string
HTTPClient *lib.HTTPClient
Routes []lib.Route
Store *lib.ClientStore
}

func (h *PublishHook) ID() string {
Expand All @@ -32,6 +32,7 @@ func (h *PublishHook) Init(config any) error {

func (h *PublishHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.Info("Received from client", "client", cl.ID, "topic", pk.TopicName, "payload", string(pk.Payload))
h.Store.Publish(cl.ID, pk.TopicName)

matched := false
for _, route := range h.Routes {
Expand All @@ -45,7 +46,7 @@ func (h *PublishHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Pac
if route.URL == "" {
break
}
err := h.Client.Publish(route.URL, pk.TopicName, pk.Payload)
err := h.HTTPClient.Publish(route.URL, pk.TopicName, pk.Payload)
if err != nil {
h.Log.Error("Failed to post on publish", "err", err, "URL", route.URL)
}
Expand All @@ -55,6 +56,7 @@ func (h *PublishHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Pac

if !matched {
h.Log.Info("No route match", "topic", pk.TopicName)
h.HTTPClient.NoMatch(pk.TopicName)
}

return pk, nil
Expand Down
63 changes: 63 additions & 0 deletions hooks/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package hooks

import (
"bytes"
"mqtt2http/lib"

mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
)

type SessionHook struct {
mqtt.HookBase
HTTPClient *lib.HTTPClient
Store *lib.ClientStore
}

func (h *SessionHook) ID() string {
return "mqtt2http-session"
}

func (h *SessionHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnACLCheck,
mqtt.OnDisconnect,
}, []byte{b})
}

func (h *SessionHook) Init(config any) error {
h.Log.Debug("Initialized")
return nil
}

func (h *SessionHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
username := string(cl.Properties.Username)
password := string(pk.Connect.Password)

h.Log.Debug("Client tries to connect", "username", username)
res, err := h.HTTPClient.Authorize(username, password)
if err != nil {
h.Log.Error("Auth request failed", "err", err)
return false
}
if !res {
h.Log.Info("Auth denied", "client", cl.ID, "username", username)
return false
}

h.Store.Enter(cl.ID, username)

return true
}

func (h *SessionHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
h.Log.Debug("ACLCheck", "client", cl.ID, "topic", topic, "write", write)
h.Store.Subscribe(cl.ID, topic)
return true
}

func (h *SessionHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
h.Log.Debug("Disconnect", "client", cl.ID, "expire", expire)
h.Store.Leave(cl.ID)
}
Loading