Skip to content

Commit

Permalink
Merge branch 'feat-stats-#3'
Browse files Browse the repository at this point in the history
  • Loading branch information
yyyar committed Jul 20, 2016
2 parents ad3890a + 3d6436a commit 00b0fbd
Show file tree
Hide file tree
Showing 18 changed files with 648 additions and 64 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

* [Clear and beautiful TOML config file](config/gobetween.toml)

* [Management REST API] (https://github.com/yyyar/gobetween/wiki/REST-API) (work in progress)
* [Management REST API] (https://github.com/yyyar/gobetween/wiki/REST-API)
* System Information
* Configuration Dump
* Servers List / Create / Delete
* Servers and Backends rx/tx, Status, Active Connections, etc.

* Integrates seamlessly with Docker and with any custom system (thanks to Exec discovery and healtchecks)

Expand Down
3 changes: 2 additions & 1 deletion config/gobetween.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ backend_connection_timeout = "0" # Backend connection timeout

[servers.sample]
bind = "localhost:3000"
balance="weight"

[servers.sample.discovery]
kind = "static"
static_list = [
"localhost:8000 weight=5",
"localhost:8000 weight=2",
"localhost:8001"
]

Expand Down
3 changes: 2 additions & 1 deletion src/api/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package api
import (
"../config"
"../manager"
"../stats"
"github.com/gin-gonic/gin"
"net/http"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func attachServers(app *gin.RouterGroup) {
*/
app.GET("/servers/:name/stats", func(c *gin.Context) {
name := c.Param("name")
c.IndentedJSON(http.StatusOK, manager.Stats(name))
c.IndentedJSON(http.StatusOK, stats.GetStats(name))
})

}
1 change: 1 addition & 0 deletions src/balance/weight.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type WeightBalancer struct{}

/**
* Elect backend based on weight strategy
* TODO: Ensure backends are sorted in the same way (not it's not bacause of map in scheduler)
*/
func (b *WeightBalancer) Elect(context *core.Context, backends []core.Backend) (*core.Backend, error) {

Expand Down
19 changes: 11 additions & 8 deletions src/core/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ import (
*/
type Backend struct {
Target
Priority int
Weight int
Live bool
Stats BackendStats
Priority int `json:"priority"`
Weight int `json:"weight"`
Stats BackendStats `json:"stats"`
}

/**
* Backend status
*/
type BackendStats struct {
ActiveConnections int
RxBytes big.Int
TxBytes big.Int
Live bool `json:"live"`
TotalConnections int64 `json:"total_connections"`
ActiveConnections int `json:"active_connections"`
RxBytes big.Int `json:"rx"`
TxBytes big.Int `json:"tx"`
RxSecond big.Int `json:"rx_second"`
TxSecond big.Int `json:"tx_second"`
}

/**
Expand Down Expand Up @@ -62,5 +65,5 @@ func (this *Backend) Address() string {
*/
func (this Backend) String() string {
return fmt.Sprintf("{%s p=%d,w=%d,l=%t,a=%d}",
this.Address(), this.Priority, this.Weight, this.Live, this.Stats.ActiveConnections)
this.Address(), this.Priority, this.Weight, this.Stats.Live, this.Stats.ActiveConnections)
}
21 changes: 21 additions & 0 deletions src/core/misc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* misc.go
*
* @author Yaroslav Pogrebnyak <[email protected]>
*/

package core

/**
* Next r/w operation data counters
*/
type ReadWriteCount struct {

/* Read bytes count */
CountRead int

/* Write bytes count */
CountWrite int

Target Target
}
4 changes: 2 additions & 2 deletions src/core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ package core
* Target host and port
*/
type Target struct {
Host string
Port string
Host string `json:"host"`
Port string `json:"port"`
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/discovery/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func dockerFetch(cfg config.DiscoveryConfig) (*[]core.Backend, error) {
},
Priority: 1,
Weight: 1,
Live: true,
Stats: core.BackendStats{
Live: true,
},
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/discovery/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func jsonFetch(cfg config.DiscoveryConfig) (*[]core.Backend, error) {
backend := core.Backend{
Weight: 1,
Priority: 1,
Live: true,
Stats: core.BackendStats{
Live: true,
},
}

if backend.Host, err = parsed.QueryToString(key + cfg.JsonHostPattern); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion src/discovery/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func srvFetch(cfg config.DiscoveryConfig) (*[]core.Backend, error) {
},
Priority: int(record.Priority),
Weight: int(record.Weight),
Live: true,
Stats: core.BackendStats{
Live: true,
},
})
}

Expand Down
72 changes: 42 additions & 30 deletions src/server/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package server

import (
"../core"
"../logging"
"io"
"math/big"
"net"
"time"
)
Expand All @@ -18,32 +18,25 @@ const (

/* Buffer size to handle data from socket */
BUFFER_SIZE = 16 * 1024
)

/**
* Next r/w operation data counters
*/
type ReadWriteCount struct {

/* Read bytes count */
CountRead int

/* Write bytes count */
CountWrite int
}
/* Interval of pushing aggregated read/write stats */
PROXY_STATS_PUSH_INTERVAL = 1 * time.Second
)

/**
* Perform copy/proxy data from 'from' to 'to' socket, counting r/w stats and
* dropping connection if timeout exceeded
*/
func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan *big.Int {
func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan core.ReadWriteCount {

log := logging.For("proxy")

stats := make(chan ReadWriteCount)
done := make(chan *big.Int)
stats := make(chan core.ReadWriteCount)
outStats := make(chan core.ReadWriteCount)

total := big.NewInt(0)
rwcBuffer := core.ReadWriteCount{}
ticker := time.NewTicker(PROXY_STATS_PUSH_INTERVAL)
flushed := false

// Stats collecting goroutine
go func() {
Expand All @@ -54,27 +47,43 @@ func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan *big.Int {

for {
select {
case <-ticker.C:
outStats <- rwcBuffer
flushed = true
case rwc, ok := <-stats:

if !ok {
done <- total
ticker.Stop()
if !flushed {
outStats <- rwcBuffer
}
close(outStats)
return
}

if timeout > 0 && rwc.CountRead > 0 {
to.SetReadDeadline(time.Now().Add(timeout))
}

total.Add(total, big.NewInt(int64(rwc.CountRead)))
// Remove non blocking
if flushed {
rwcBuffer = rwc
} else {
rwcBuffer.CountWrite += rwc.CountWrite
rwcBuffer.CountRead = rwc.CountRead
}

flushed = false
}
}
}()

// Run proxy copier
go func() {
err := Copy(to, from, stats)
if err != nil {
log.Info(err)
// hack to determine normal close. TODO: fix when it will be exposed in golang
if err != nil && err.(*net.OpError).Err.Error() != "use of closed network connection" {
log.Warn(err)
}

to.Close()
Expand All @@ -84,13 +93,13 @@ func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan *big.Int {
close(stats)
}()

return done
return outStats
}

/**
* It's build by analogy of io.Copy
*/
func Copy(to io.Writer, from io.Reader, ch chan<- ReadWriteCount) error {
func Copy(to io.Writer, from io.Reader, ch chan<- core.ReadWriteCount) error {

buf := make([]byte, BUFFER_SIZE)
var err error = nil
Expand All @@ -100,14 +109,17 @@ func Copy(to io.Writer, from io.Reader, ch chan<- ReadWriteCount) error {

if readN > 0 {

// send read bytes count
ch <- ReadWriteCount{CountRead: readN}

writeN, writeErr := to.Write(buf[0:readN])
if writeN > 0 {
// send write bytes count
ch <- ReadWriteCount{CountWrite: writeN}
}

// non-blocking stats send
// may produce innacurate counters because receiving
// part may miss them. NOTE. Remove non-blocking if will be needed
//select {
//case ch <- core.ReadWriteCount{CountRead: readN, CountWrite: writeN}:
//default:
// }

ch <- core.ReadWriteCount{CountRead: readN, CountWrite: writeN}

if writeErr != nil {
err = writeErr
Expand Down
Loading

0 comments on commit 00b0fbd

Please sign in to comment.