Skip to content

Commit

Permalink
style: clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
raylhuang committed Nov 20, 2020
1 parent 4ca8005 commit b028161
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM byrnedo/alpine-curl
FROM ubuntu:latest
COPY kvass /kvass

ENTRYPOINT ["/kvass"]
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ build: dep ## Build the binary file
@go build -i -o build/kvass cmd/kvass/*.go

clean: ## Remove previous build
@rm -f ./build
@rm -fr ./build
@rm -fr cover.out coverage.txt
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# Kvass

[![Go Report Card](https://goreportcard.com/badge/github.com/tkestack/kvass)](https://goreportcard.com/report/github.com/tkestack/kvass) [![Build](https://github.com/tkestack/kvass/workflows/Build/badge.svg?branch=master)]() [![codecov](https://codecov.io/gh/tkestack/kvass/branch/master/graph/badge.svg)](https://codecov.io/gh/tkestack/kvass)
[![Go Report Card](https://goreportcard.com/badge/github.com/tkestack/kvass)](https://goreportcard.com/report/github.com/tkestack/kvass) [![Build](https://github.com/tkestack/kvass/workflows/Build/badge.svg?branch=master)]()
------

Kvass provides a method of Prometheus sharding, which uses config file injection to proxy Prometheus scraping to the shard sidecar, and the shard sidecar decides whether to scrape target。

A coordinator manage all shards and assigned targets to each of them。

Kvass provides a method of Prometheus sharding, which uses Sidecar to generate new config only use "static_configs" for Prometheus scraping according to targets assigned from Coordinator
A Coordinator manage all shards and assigned targets to each of them。
Thanos (or other storage solution) is used to provide a global data view。

![image-20200916114336447](./README.assets/image-20200916114336447.png)
Expand Down Expand Up @@ -42,7 +40,7 @@ the max series each Prometheus Shard can scrape is a flag of Coordinator Pod.
in the example case we set to 30000.

> ```
> --max-series=30000
> --shard.max-series=30000
> ```
now we have 6 target with 60000+ series and each Shard can scrape 30000 series,so need 3 Shard to cover all targets.
Expand All @@ -65,7 +63,7 @@ but we can get global data view use thanos-query
Coordinator use label selector to select shards StatefulSets, every StatefulSet is a replica, Kvass puts together Pods with same index of different StatefulSet into one Shards Group.
> --sts-selector=app.kubernetes.io/name=prometheus
> --shard.selector=app.kubernetes.io/name=prometheus
## Build binary
Expand Down
7 changes: 4 additions & 3 deletions cmd/kvass/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ distribution targets to shards`,

cd = coordinator.NewCoordinator(
ins,
exp,
targetDiscovery,
cdCfg.shardMaxSeries,
cdCfg.shadMaxShard,
cdCfg.syncInterval,
exp.Get,
targetDiscovery.ActiveTargets,
lg.WithField("component", "coordinator"))

api = coordinator.NewAPI(
Expand All @@ -136,7 +136,8 @@ distribution targets to shards`,
func(targets map[string][]*discovery.SDTargets) (statuses map[uint64]*target.ScrapeStatus, e error) {
return getTargetStatus(ins, exp, targets)
},
targetDiscovery,
targetDiscovery.ActiveTargets,
targetDiscovery.DropTargets,
lg.WithField("component", "web"),
)
)
Expand Down
42 changes: 20 additions & 22 deletions pkg/coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,34 @@ import (
"github.com/sirupsen/logrus"
)

// TargetDiscovery manager provide active targets and dropped targets
type TargetDiscovery interface {
// ActiveTargets return global active targets from SD
ActiveTargets() map[string][]*discovery.SDTargets
// DropTargets return global dropped targets from SD
DropTargets() map[string][]*discovery.SDTargets
}

// API is the api server of coordinator
type API struct {
// gin.Engine is the gin engine for handle http request
*gin.Engine
ConfigReload chan *config.Config
lg logrus.FieldLogger
readConfig func() ([]byte, error)
getScrapeStatus func(map[string][]*discovery.SDTargets) (map[uint64]*target.ScrapeStatus, error)
targetDiscovery TargetDiscovery
ConfigReload chan *config.Config
lg logrus.FieldLogger

readConfig func() ([]byte, error)
getScrapeStatus func(map[string][]*discovery.SDTargets) (map[uint64]*target.ScrapeStatus, error)
getActiveTargets func() map[string][]*discovery.SDTargets
getDropTargets func() map[string][]*discovery.SDTargets
}

// NewAPI return a new web server
func NewAPI(
readConfig func() ([]byte, error),
getScrapeStatus func(map[string][]*discovery.SDTargets) (map[uint64]*target.ScrapeStatus, error),
targetDiscovery TargetDiscovery,
getActiveTargets func() map[string][]*discovery.SDTargets,
getDropTargets func() map[string][]*discovery.SDTargets,
lg logrus.FieldLogger) *API {
w := &API{
ConfigReload: make(chan *config.Config, 2),
Engine: gin.Default(),
lg: lg,
readConfig: readConfig,
getScrapeStatus: getScrapeStatus,
targetDiscovery: targetDiscovery,
ConfigReload: make(chan *config.Config, 2),
Engine: gin.Default(),
lg: lg,
readConfig: readConfig,
getScrapeStatus: getScrapeStatus,
getActiveTargets: getActiveTargets,
getDropTargets: getDropTargets,
}

w.GET("/api/v1/targets", api.Wrap(lg, w.targets))
Expand All @@ -77,6 +73,8 @@ func NewAPI(
return w
}

// targets compatible of prometheus API /api/v1/targets
// targets combines targets information from service discovery, sidecar and exploring
func (a *API) targets(ctx *gin.Context) *api.Result {
state := ctx.Query("state")
sortKeys := func(targets map[string][]*discovery.SDTargets) ([]string, int) {
Expand Down Expand Up @@ -106,7 +104,7 @@ func (a *API) targets(ctx *gin.Context) *api.Result {
res := &v1.TargetDiscovery{}

if showActive {
activeTargets := a.targetDiscovery.ActiveTargets()
activeTargets := a.getActiveTargets()
activeKeys, numTargets := sortKeys(activeTargets)
res.ActiveTargets = make([]*v1.Target, 0, numTargets)
status, err := a.getScrapeStatus(activeTargets)
Expand Down Expand Up @@ -140,7 +138,7 @@ func (a *API) targets(ctx *gin.Context) *api.Result {
res.ActiveTargets = []*v1.Target{}
}
if showDropped {
tDropped := flatten(a.targetDiscovery.DropTargets())
tDropped := flatten(a.getDropTargets())
res.DroppedTargets = make([]*v1.DroppedTarget, 0, len(tDropped))
for _, t := range tDropped {
res.DroppedTargets = append(res.DroppedTargets, &v1.DroppedTarget{
Expand Down
73 changes: 38 additions & 35 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,60 +23,49 @@ import (
"github.com/prometheus/prometheus/scrape"
"golang.org/x/sync/errgroup"
"time"
"tkestack.io/kvass/pkg/discovery"
"tkestack.io/kvass/pkg/shard"
"tkestack.io/kvass/pkg/target"
"tkestack.io/kvass/pkg/utils/wait"

"github.com/sirupsen/logrus"
)

type Explore interface {
Get(job string, hash uint64) *target.ScrapeStatus
}

// Coordinator periodically re balance all replicates
type Coordinator struct {
log logrus.FieldLogger
shardManager shard.Manager
explore Explore
targetDiscovery TargetDiscovery
maxSeries int64
maxShard int32
period time.Duration
log logrus.FieldLogger
shardManager shard.Manager
maxSeries int64
maxShard int32
period time.Duration

getExploreResult func(job string, hash uint64) *target.ScrapeStatus
getActive func() map[string][]*discovery.SDTargets
}

// NewCoordinator create a new coordinator service
func NewCoordinator(
shardManager shard.Manager,
explore Explore,
targetDiscovery TargetDiscovery,
maxSeries int64,
maxShard int32,
period time.Duration,
getExploreResult func(job string, hash uint64) *target.ScrapeStatus,
getActive func() map[string][]*discovery.SDTargets,
log logrus.FieldLogger) *Coordinator {
return &Coordinator{
shardManager: shardManager,
explore: explore,
targetDiscovery: targetDiscovery,
maxShard: maxShard,
maxSeries: maxSeries,
log: log,
period: period,
shardManager: shardManager,
getExploreResult: getExploreResult,
getActive: getActive,
maxShard: maxShard,
maxSeries: maxSeries,
log: log,
period: period,
}
}

// Run do coordinate periodically until ctx done
func (c *Coordinator) Run(ctx context.Context) error {
tk := time.NewTicker(c.period)
for {
select {
case <-ctx.Done():
return nil
case <-tk.C:
if err := c.runOnce(); err != nil {
c.log.Errorf(err.Error())
}
}
}
return wait.RunUntil(ctx, c.log, c.period, c.RunOnce)
}

type shardInfo struct {
Expand All @@ -86,16 +75,30 @@ type shardInfo struct {
newTargets map[string][]*target.Target
}

func (c *Coordinator) runOnce() error {
// RunOnce get shards information from shard manager,
// do shard reBalance and change expect shard number
func (c *Coordinator) RunOnce() error {
shards, err := c.shardManager.Shards()
if err != nil {
return errors.Wrapf(err, "Shards")
}

shardsInfo := c.getShardsInfo(shards)
scraping := c.globalScraping(shardsInfo)
exp, err := c.reBalance(shardsInfo, scraping)
if err != nil {
return errors.Wrapf(err, "reBalance")
}

return c.shardManager.ChangeScale(exp)
}

func (c *Coordinator) reBalance(
shardsInfo []*shardInfo,
scraping map[uint64]*shardInfo,
) (int32, error) {
needSpace := int64(0)
for job, ts := range c.targetDiscovery.ActiveTargets() {
for job, ts := range c.getActive() {
for _, target := range ts {
t := target.ShardTarget
sd := scraping[t.Hash]
Expand All @@ -106,7 +109,7 @@ func (c *Coordinator) runOnce() error {
}

// if no shard scraping this target, we try assign it
exp := c.explore.Get(job, t.Hash)
exp := c.getExploreResult(job, t.Hash)
if exp == nil {
c.log.Error("can not found target %d from explore", t.Hash)
continue
Expand Down Expand Up @@ -147,7 +150,7 @@ func (c *Coordinator) runOnce() error {
exp = c.maxShard
}

return c.shardManager.ChangeScale(exp)
return exp, nil
}

func (c *Coordinator) globalScraping(info []*shardInfo) map[uint64]*shardInfo {
Expand Down
5 changes: 3 additions & 2 deletions pkg/prom/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "github.com/prometheus/prometheus/web/api/v1"
)

// Client is a client to do prometheus API request
type Client struct {
url string
}
Expand All @@ -34,14 +35,14 @@ func NewClient(url string) *Client {
}
}

// runtimeInfo return the current status of this shard, only return tManager targets if scrapingOnly is true,
// RuntimeInfo return the current status of this shard, only return tManager targets if scrapingOnly is true,
// otherwise ,all target this cli discovered will be returned
func (c *Client) RuntimeInfo() (*RuntimeInfo, error) {
ret := &RuntimeInfo{}
return ret, api.Get(c.url+"/api/v1/status/runtimeinfo", ret)
}

// targets is compatible with prometheusURL /api/v1/targets
// Targets is compatible with prometheusURL /api/v1/targets
// the origin prometheusURL's Config is injected, so the targets it report must be adjusted by cli sidecar
func (c *Client) Targets(state string) (*v1.TargetDiscovery, error) {
url := c.url + "/api/v1/targets"
Expand Down
2 changes: 1 addition & 1 deletion pkg/scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestManager(t *testing.T) {
cfg.ScrapeTimeout = model.Duration(time.Second)
r.NoError(os.Setenv("SCRAPE_PROXY", "http://127.0.0.1:9090"))

ss := &Manager{}
ss := New()
r.NoError(ss.ApplyConfig(&config.Config{ScrapeConfigs: []*config.ScrapeConfig{cfg}}))
s := ss.GetJob(cfg.JobName)
r.NotNil(s)
Expand Down
4 changes: 2 additions & 2 deletions pkg/shard/kubernetes/shardmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ShardManager struct {
getPods func(lb map[string]string) (*v1.PodList, error)
}

// NewShard create a new ShardManager shards manager
// New create a new StatefulSet shards manager
func New(cli kubernetes.Interface,
stsNamespace string,
stsSelector string,
Expand Down Expand Up @@ -95,7 +95,7 @@ func (s *ShardManager) Shards() ([]*shard.Shard, error) {

ret := make([]*shard.Shard, 0)
for i := 0; i < maxPods; i++ {
sg := shard.NewShard(fmt.Sprintf("shard-%d", i), s.lg.WithField("shard", i))
sg := shard.NewGroup(fmt.Sprintf("shard-%d", i), s.lg.WithField("shard", i))
for _, pods := range podss {
if i < len(pods) {
p := pods[i]
Expand Down
9 changes: 8 additions & 1 deletion pkg/shard/kubernetes/shardmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ func TestStatefulSet_Shards(t *testing.T) {
sts.getPods = func(lb map[string]string) (list *v1.PodList, e error) {
pl := &v1.PodList{}
for i := 0; i < 2; i++ {
pl.Items = append(pl.Items, v1.Pod{})
p := v1.Pod{}
p.Status.Conditions = []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
}
pl.Items = append(pl.Items, p)
}
return pl, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type Shard struct {
replicates []*Replicas
}

// NewShard return a new shard with no replicate
func NewShard(id string, lg logrus.FieldLogger) *Shard {
// NewGroup return a new shard with no replicate
func NewGroup(id string, lg logrus.FieldLogger) *Shard {
return &Shard{
ID: id,
log: lg,
Expand All @@ -47,7 +47,7 @@ func (s *Shard) Replicas() []*Replicas {
return s.replicates
}

// targetsScraping return the targets hash that this Shard scraping
// TargetsScraping return the targets hash that this Shard scraping
// the key of the map is target hash
// the result is union set of all replicates
func (s *Shard) TargetsScraping() (map[uint64]bool, error) {
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions pkg/sidecar/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (w *API) ServeHTTP(wt http.ResponseWriter, r *http.Request) {
httputil.NewSingleHostReverseProxy(u).ServeHTTP(wt, r)
}

// Run start API at "address"
func (w *API) Run(address string) error {
return http.ListenAndServe(address, w)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/target/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type ScrapeStatus struct {
lastSeries []int64
}

// SetScrapeErr mark the result of this scraping
// health will be down if err is not nil
// health will be up if err is nil
func (t *ScrapeStatus) SetScrapeErr(start time.Time, err error) {
t.LastScrape = start
t.LastScrapeDuration = time.Now().Sub(start).Seconds()
Expand Down
Loading

0 comments on commit b028161

Please sign in to comment.