diff --git a/cmd/web.go b/cmd/web.go index 6e64983..f49a684 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -40,10 +40,15 @@ func runWeb(c *cli.Context) { fmt.Println(err) return } + if err := models.InitEtcd(); err != nil { fmt.Println(err) return } + if err := models.InitK8s(); err != nil { + fmt.Println(err) + return + } models.SyncDatabase() m := macaron.New() diff --git a/conf/runtime.yaml b/conf/runtime.yaml new file mode 100644 index 0000000..c28ef9e --- /dev/null +++ b/conf/runtime.yaml @@ -0,0 +1,32 @@ +--- +run: + runMode: dev + logPath: log/vessel +http: + listenMode: http + httpsCertFile: cert/containerops/containerops.crt + httpsKeyFile: cert/containerops/containerops.key + host: 0.0.0.0 + port: 4488 +database: + username: root + password: dolo0425 + protocol: tcp + host: 10.67.147.80 + port: 3306 + schema: vesseldb + param: + charset: utf8 + parseTime: True + loc: Local +etcd: + endpoints: + - host: 127.0.0.1 + port: 4001 + - host: localhost + port: 4001 + username: etcd + password: etcd +k8s: + host: 127.0.0.1 + port: 8080 diff --git a/handler/pipeline.go b/handler/pipeline.go index 47ae755..7abc476 100644 --- a/handler/pipeline.go +++ b/handler/pipeline.go @@ -1,15 +1,16 @@ package handler import ( + "encoding/json" "net/http" "strings" "time" - "encoding/json" log "github.com/Sirupsen/logrus" "gopkg.in/macaron.v1" "github.com/containerops/vessel/models" + "github.com/containerops/vessel/module/kubernetes" "github.com/containerops/vessel/module/pipeline" "github.com/containerops/vessel/utils" ) @@ -93,8 +94,8 @@ func createPipelineAndStage(pst models.PipelineSpecTemplate) (*models.Pipeline, plInfo.Labels = utils.TransMapToStr(pst.MetaData.Labels) plInfo.Annotations = utils.TransMapToStr(pst.MetaData.Annotations) //ignore plJson Detail - pstbs,err := json.Marshal(pst) - if err != nil{ + pstbs, err := json.Marshal(pst) + if err != nil { return nil, err } plInfo.Detail = string(pstbs) @@ -115,8 +116,8 @@ func createPipelineAndStage(pst models.PipelineSpecTemplate) (*models.Pipeline, //StatusCheckInterval to Detail //StatusCheckCount to Detail - sbs,err := json.Marshal(value) - if err != nil{ + sbs, err := json.Marshal(value) + if err != nil { return nil, err } sInfo.Detail = string(sbs) @@ -147,8 +148,10 @@ func V1GETPipelineHandler(ctx *macaron.Context) (int, []byte) { return http.StatusOK, []byte("") } -func V1DELETEPipelineHandler(ctx *macaron.Context) (int, []byte) { - return http.StatusOK, []byte("") +func V1DELETEPipelineHandler(ctx *macaron.Context, reqData models.PipelineSpecTemplate) (int, []byte) { + kubernetes.DeletePipeline(&reqData) + retstr := "Sent delete pipeline " + reqData.MetaData.Name + " event" + return http.StatusOK, []byte(retstr) } func V1RunPipelineHandler(ctx *macaron.Context) (int, []byte) { diff --git a/models/models.go b/models/models.go index 1816a97..fcc69a7 100644 --- a/models/models.go +++ b/models/models.go @@ -4,6 +4,9 @@ import ( "fmt" "time" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/unversioned" + "golang.org/x/net/context" "github.com/containerops/vessel/setting" @@ -15,6 +18,7 @@ import ( var ( EtcdClient client.Client DbClient *gorm.DB + K8sClient *unversioned.Client ) //Init Database @@ -100,6 +104,20 @@ func SyncDatabase() error { } } + if !db.HasTable(&PipelineMetaData{}) { + err = db.CreateTable(&PipelineMetaData{}).Error + if err != nil { + return err + } + } + + if !db.HasTable(&StageSpec{}) { + err = db.CreateTable(&StageSpec{}).Error + if err != nil { + return err + } + } + fmt.Println("sync DB done !") return nil } @@ -137,6 +155,21 @@ func InitEtcd() error { } +func InitK8s() error { + clientConfig := restclient.Config{} + host := setting.RunTime.K8s.Host + ":" + setting.RunTime.K8s.Port + fmt.Println(host) + clientConfig.Host = host + // clientConfig.Host = setting.RunTime.Database.Host + client, err := unversioned.New(&clientConfig) + if err != nil { + return err + fmt.Errorf("New unversioned client err: %v!\n", err.Error()) + } + K8sClient = client + return nil +} + // // //Sync ETCD // func SyncETCD() error { diff --git a/models/pipelinetemplate.go b/models/pipelinetemplate.go index 566e73d..98b02ac 100644 --- a/models/pipelinetemplate.go +++ b/models/pipelinetemplate.go @@ -4,7 +4,7 @@ type PipelineSpecTemplate struct { Kind string `json:"kind"` ApiVersion string `json:"apiVersion"` MetaData PipelineMetaData `json:"metadata"` - Spec []StageSpec `json:"spec"` + Spec []StageSpec `json:"spec" sql:"-"` } type PipelineMetaData struct { @@ -17,8 +17,8 @@ type PipelineMetaData struct { CreateonTimestamp string `json:"createonTimestamp"` DeletionTimestamp string `json:"deletionTimestamp"` TimeoutDuration int64 `json:"timeoutDuration"` - Labels map[string]string `json:"labels"` - Annotations map[string]string `json:"annotations"` + Labels map[string]string `json:"labels" sql:"-"` + Annotations map[string]string `json:"annotations" sql:"-"` } /*// pipelineMetadata struct for convert from pipelineVersion.MetaData @@ -53,12 +53,14 @@ type StageSpec struct { ProjectId int64 PipelineId int64 StageId int64 - Replicas int `json:"replicsa"` + Replicas int `json:"replicas"` Dependence string `json:"dependence"` Kind string `json:"kind"` - StatusCheckUrl string `json:"statusCheckUrl"` + StatusCheckUrl string `json:"statusCheckLink"` StatusCheckInterval int64 `json:"statusCheckInterval"` - StatusCheckCount int64 `json:"statusCheckCount"` + StatusCheckCount int `json:"statusCheckCount"` Image string `json:"image"` Port int `json:"port"` + EnvName string `json:"envName"` + EnvValue string `json:"envValue"` } diff --git a/module/etcdpersist/db/connector.go b/module/etcdpersist/db/connector.go new file mode 100644 index 0000000..9282deb --- /dev/null +++ b/module/etcdpersist/db/connector.go @@ -0,0 +1,67 @@ +package db + +import ( + "database/sql" + "fmt" + + "github.com/astaxie/beego/orm" + _ "github.com/go-sql-driver/mysql" + "github.com/golang/glog" +) + +var ( + Orm orm.Ormer +) + +func init() { + orm.RegisterModel(new(Tb_etcd_backup)) +} + +func Syncdb(db_user, db_pass, db_host, db_port, db_name string) error { + err := Create(db_user, db_pass, db_host, db_port, db_name) + if err != nil { + return err + } + err = Connect(db_user, db_pass, db_host, db_port, db_name) + if err != nil { + return err + } + name := "default" + force := true + verbose := true + err = orm.RunSyncdb(name, force, verbose) + glog.Infoln("database init is complete.\nPlease restart the application") + return err +} + +func Connect(db_user, db_pass, db_host, db_port, db_name string) error { + var dns string + orm.RegisterDriver("mysql", orm.DRMySQL) + dns = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", db_user, db_pass, db_host, db_port, db_name) + err := orm.RegisterDataBase("default", "mysql", dns) + if err != nil { + return err + } + Orm = orm.NewOrm() + err = Orm.Using("default") + return err +} + +func Create(db_user, db_pass, db_host, db_port, db_name string) error { + var dns string + var sqlstring string + dns = fmt.Sprintf("%s:%s@tcp(%s:%s)/?charset=utf8", db_user, db_pass, db_host, db_port) + sqlstring = fmt.Sprintf("CREATE DATABASE if not exists `%s` CHARSET utf8 COLLATE utf8_general_ci", db_name) + db, err := sql.Open("mysql", dns) + if err != nil { + panic(err.Error()) + } + r, err := db.Exec(sqlstring) + if err != nil { + glog.Infoln(err, r) + } else { + glog.Infoln("Database ", db_name, " created") + } + defer db.Close() + return err +} diff --git a/module/etcdpersist/db/db_test.go b/module/etcdpersist/db/db_test.go new file mode 100644 index 0000000..049ed20 --- /dev/null +++ b/module/etcdpersist/db/db_test.go @@ -0,0 +1,14 @@ +package db + +import ( + "strings" + "testing" +) + +func Test_db(t *testing.T) { + s := "/dir" + index := strings.LastIndex(s, "/") + r := []rune(s) + s = string(r[0:index]) + t.Log(s == "") +} diff --git a/module/etcdpersist/db/tb_etcd_backup.go b/module/etcdpersist/db/tb_etcd_backup.go new file mode 100644 index 0000000..480a254 --- /dev/null +++ b/module/etcdpersist/db/tb_etcd_backup.go @@ -0,0 +1,78 @@ +package db + +import ( + "strings" + "time" +) + +type Tb_etcd_backup struct { + Id int64 + Key string `orm:"size(255)"` + Parent_key string `orm:"size(255)"` + Value string `orm:"size(255)"` + Dir bool + Ttl uint32 + Modified_index uint32 + Created_index uint32 +} + +func (this *Tb_etcd_backup) Update() (int64, error) { + return Orm.Update(this) +} + +// server detection and delete expire key +func (this *Tb_etcd_backup) DeleteExpire() (bool, int64, error) { + // To be realized + return true, 0, nil +} + +func (this *Tb_etcd_backup) InertOrUpdate() (bool, int64, error) { + index := strings.LastIndex(this.Key, "/") + this.Parent_key = string([]rune(this.Key)[0:index]) + if this.Parent_key == "" { + this.Parent_key = "/" + } + + newObj := &Tb_etcd_backup{} + newObj.Key = this.Key + newObj.Value = this.Value + newObj.Dir = this.Dir + newObj.Ttl = this.Ttl + newObj.Modified_index = this.Modified_index + newObj.Created_index = this.Created_index + newObj.Parent_key = this.Parent_key + create, id, err := Orm.ReadOrCreate(this, "Key") + if err == nil && !create { + if newObj.Modified_index > this.Modified_index { + newObj.Id = this.Id + id, err = Orm.Update(newObj) + } + } + return create, id, err +} + +func (this *Tb_etcd_backup) Delete() (int64, error) { + return Orm.Delete(this) +} + +func (this *Tb_etcd_backup) Read() error { + return Orm.Read(this) +} +func (this *Tb_etcd_backup) Insert() (int64, error) { + return Orm.Insert(this) +} +func (this *Tb_etcd_backup) Exist() bool { + return Orm.QueryTable("Tb_etcd_backup").Filter("Key", this.Key).Exist() +} +func (this *Tb_etcd_backup) IsDirectory() bool { + return this.Dir +} + +func (this *Tb_etcd_backup) IsExpired() bool { + if this.Ttl > 0 { + if uint32(time.Now().Unix())-this.Ttl > 0 { + return true + } + } + return false +} diff --git a/module/etcdpersist/etcd-config.json b/module/etcdpersist/etcd-config.json new file mode 100644 index 0000000..f5fd55f --- /dev/null +++ b/module/etcdpersist/etcd-config.json @@ -0,0 +1,17 @@ +{ + "cluster": { + "leader": "http://192.168.40.86:4001/", + "machines": [ + "http://192.168.40.86:4001/", + "http://192.168.40.86:4002/", + "http://192.168.40.86:4003/" + ] + }, + "config": { + "certFile": "", + "keyFile": "", + "caCertFiles": [], + "timeout":9999999999999, + "consistency": "STRONG" + } +} diff --git a/module/etcdpersist/main.go b/module/etcdpersist/main.go new file mode 100644 index 0000000..53f7cef --- /dev/null +++ b/module/etcdpersist/main.go @@ -0,0 +1,94 @@ +package main + +import ( + "os" + "time" + + "github.com/containerops/vessel/module/etcdpersist/db" + "github.com/containerops/vessel/module/etcdpersist/persist" + + "github.com/codegangsta/cli" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" +) + +func main() { + app := cli.NewApp() + app.Name = "etcd backup" + app.Usage = "etcd bakcup" + app.Commands = []cli.Command{ + { + Name: "backup", + Aliases: []string{"b"}, + Usage: "backup etcd", + Action: backup, + Flags: []cli.Flag{ + cli.IntFlag{Name: "delay"}, + cli.StringFlag{Name: "etcd-config"}, + cli.StringFlag{Name: "dbname"}, + cli.StringFlag{Name: "dbpwd"}, + cli.StringFlag{Name: "dbip"}, + cli.StringFlag{Name: "dbport"}, + cli.StringFlag{Name: "dbaccount"}, + }, + }, + { + Name: "install", + Aliases: []string{"i"}, + Usage: "install mysql", + Action: install, + Flags: []cli.Flag{ + cli.StringFlag{Name: "dbname"}, + cli.StringFlag{Name: "dbpwd"}, + cli.StringFlag{Name: "dbip"}, + cli.StringFlag{Name: "dbport"}, + cli.StringFlag{Name: "dbaccount"}, + }, + }, + } + + app.Run(os.Args) +} + +func createEtcd(configPath string) *etcd.Client { + etcdClient, err := etcd.NewClientFromFile(configPath) + if err != nil { + glog.Fatalln("Can not locate configuration file: `"+configPath+"`. Error: ", err) + } + + success := etcdClient.SyncCluster() + if !success { + glog.Fatalln("cannot sync machines") + } + + return etcdClient +} +func extractetcd(etcdClient persist.EtcdClient) { + dataSet := persist.DownloadData([]string{"/"}, true, true, etcdClient) + for _, v := range dataSet { + v.InertOrUpdate() + } +} +func backup(c *cli.Context) { + if err := db.Connect(c.String("dbaccount"), c.String("dbpwd"), + c.String("dbip"), c.String("dbport"), c.String("dbname")); err != nil { + glog.Fatalln(err) + } + etcdClient := createEtcd(persist.Conf.EtcdConfigPath) + extractetcd(etcdClient) + glog.Infoln("backup running", time.Now().UTC()) + for { + select { + case <-time.After(time.Second * time.Duration(persist.Conf.Delay)): + extractetcd(etcdClient) + glog.Infoln("backup running", time.Now().UTC()) + } + } +} +func install(c *cli.Context) { + glog.Infoln("install running") + if err := db.Syncdb(c.String("dbaccount"), c.String("dbpwd"), + c.String("dbip"), c.String("dbport"), c.String("dbname")); err != nil { + glog.Fatalln(err) + } +} diff --git a/module/etcdpersist/persist/config.go b/module/etcdpersist/persist/config.go new file mode 100644 index 0000000..5535cb4 --- /dev/null +++ b/module/etcdpersist/persist/config.go @@ -0,0 +1,26 @@ +package persist + +import "flag" + +type Config struct { + EtcdConfigPath string + Delay int +} + +var ( + Conf *Config +) + +func init() { + Conf = parseOptions() +} + +func parseOptions() *Config { + delay := flag.Int("delay", 10, "Delay time to extract etcd data") + etcdConfigPath := flag.String("etcd-config", "etcd-config.json", "Location of the etcd config file") + flag.Parse() + return &Config{ + Delay: *delay, + EtcdConfigPath: *etcdConfigPath, + } +} diff --git a/module/etcdpersist/persist/persist.go b/module/etcdpersist/persist/persist.go new file mode 100644 index 0000000..d7fbf08 --- /dev/null +++ b/module/etcdpersist/persist/persist.go @@ -0,0 +1,70 @@ +package persist + +import ( + "github.com/containerops/vessel/module/etcdpersist/db" + + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" +) + +type EtcdClient interface { + Get(key string, sort, recursive bool) (*etcd.Response, error) + Set(key string, value string, ttl uint64) (*etcd.Response, error) + SetDir(key string, ttl uint64) (*etcd.Response, error) +} + +// load data from etcd cache +func DownloadData(keys []string, sorted bool, recursive bool, etcdClient EtcdClient) []*db.Tb_etcd_backup { + persistData := make([]*db.Tb_etcd_backup, 0) + for _, key := range keys { + response, err := etcdClient.Get(key, sorted, recursive) + if err != nil { + glog.Infoln("Trying to get the following key: "+key+". Error: ", err) + } + persistData = append(persistData, extractNodes(response.Node, recursive)...) + } + return persistData +} + +func extractNodes(node *etcd.Node, recursive bool) []*db.Tb_etcd_backup { + backupKeys := make([]*db.Tb_etcd_backup, 0) + if recursive { + backupKeys = backupNodes(node) + } else { + backupKeys = append(backupKeys, backupNode(node)) + } + return backupKeys +} + +// copy data to database struct +func backupNode(node *etcd.Node) *db.Tb_etcd_backup { + key := &db.Tb_etcd_backup{ + Key: node.Key, + Modified_index: uint32(node.ModifiedIndex), + Created_index: uint32(node.CreatedIndex), + } + key.Dir = node.Dir + if node.Expiration != nil { + key.Ttl = uint32(node.Expiration.Unix()) + } + if node.Dir != true && node.Key != "" { + key.Value = node.Value + } + return key +} + +func backupNodes(node *etcd.Node) []*db.Tb_etcd_backup { + backupKeys := []*db.Tb_etcd_backup{} + + if len(node.Nodes) > 0 { + for _, nodeChild := range node.Nodes { + backupKeys = append(backupKeys, backupNodes(nodeChild)...) + } + } else { + backupKey := backupNode(node) + if backupKey.Key != "" { + backupKeys = append(backupKeys, backupKey) + } + } + return backupKeys +} diff --git a/module/etcdpersist/run b/module/etcdpersist/run new file mode 100644 index 0000000..ee1ad32 --- /dev/null +++ b/module/etcdpersist/run @@ -0,0 +1,2 @@ +#!/bin/bash +etcdpersist backup -delay=5 -etcd-config=etcd-config.json -dbname=vessel -dbport=3306 -dbip=192.168.40.142 -dbpwd=123456 -dbaccount=root diff --git a/module/etcdpersist/utils/debug.go b/module/etcdpersist/utils/debug.go new file mode 100644 index 0000000..4c205ac --- /dev/null +++ b/module/etcdpersist/utils/debug.go @@ -0,0 +1,89 @@ +package utils + +import ( + "fmt" + "reflect" + "strconv" +) + +type variable struct { + // Output dump string + Out string + + // Indent counter + indent int64 +} + +func (v *variable) dump(val reflect.Value, name string) { + v.indent++ + + if val.IsValid() && val.CanInterface() { + typ := val.Type() + + switch typ.Kind() { + case reflect.Array, reflect.Slice: + v.printType(name, val.Interface()) + l := val.Len() + for i := 0; i < l; i++ { + v.dump(val.Index(i), strconv.Itoa(i)) + } + case reflect.Map: + v.printType(name, val.Interface()) + //l := val.Len() + keys := val.MapKeys() + for _, k := range keys { + v.dump(val.MapIndex(k), k.Interface().(string)) + } + case reflect.Ptr: + v.printType(name, val.Interface()) + v.dump(val.Elem(), name) + case reflect.Struct: + v.printType(name, val.Interface()) + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + v.dump(val.FieldByIndex([]int{i}), field.Name) + } + default: + v.printValue(name, val.Interface()) + } + } else { + v.printValue(name, "") + } + + v.indent-- +} + +func (v *variable) printType(name string, vv interface{}) { + v.printIndent() + v.Out = fmt.Sprintf("%s%s(%T)\n", v.Out, name, vv) +} + +func (v *variable) printValue(name string, vv interface{}) { + v.printIndent() + v.Out = fmt.Sprintf("%s%s(%T) %#v\n", v.Out, name, vv, vv) +} + +func (v *variable) printIndent() { + var i int64 + for i = 0; i < v.indent; i++ { + v.Out = fmt.Sprintf("%s ", v.Out) + } +} + +// Print to standard out the value that is passed as the argument with indentation. +// Pointers are dereferenced. +func Dump(v interface{}) { + val := reflect.ValueOf(v) + dump := &variable{indent: -1} + dump.dump(val, "") + fmt.Printf("%s", dump.Out) +} + +// Return the value that is passed as the argument with indentation. +// Pointers are dereferenced. +func Sdump(v interface{}) string { + val := reflect.ValueOf(v) + dump := &variable{indent: -1} + dump.dump(val, "") + return dump.Out +} diff --git a/module/kubernetes/bussiness.go b/module/kubernetes/bussiness.go new file mode 100644 index 0000000..0cef64a --- /dev/null +++ b/module/kubernetes/bussiness.go @@ -0,0 +1,100 @@ +package kubernetes + +import ( + "fmt" + "time" + + "github.com/containerops/vessel/models" +) + +func GetPipelineBussinessRes(pipelineVersion *models.PipelineSpecTemplate, ch chan bool) { + fmt.Println("Enter GetPipelineBussinessRes") + namespace := pipelineVersion.MetaData.Namespace + timeout := pipelineVersion.MetaData.TimeoutDuration + // replicas := pipelineVersion. + for _, stage := range pipelineVersion.Spec { + if stage.StatusCheckCount == 0 || stage.StatusCheckInterval == 0 { + // ch <- true + continue + } + /*select { + } + case*/ + replicas := stage.Replicas + ipArray := make([]string, replicas) + err := getPodIp(namespace, stage.Name, &ipArray) + if err != nil { + ch <- false + fmt.Printf("xxxxx%v\n", err) + // fmt.Println("aaaaaaaaaaaaa") + return + } + + port := stage.Port + statusCheckLink := stage.StatusCheckUrl + statusCheckInterval := stage.StatusCheckInterval + statusCheckCount := stage.StatusCheckCount + podsCh := make([]chan bool, replicas) + t := time.NewTimer(time.Second * time.Duration(timeout)) + for i := 0; i < replicas; i++ { + checkUrl := fmt.Sprintf("https://%v:%v%v", ipArray[i], port, statusCheckLink) + go getPodBussinessRes(checkUrl, statusCheckInterval, statusCheckCount, podsCh[i]) + } + + podCh := make(chan bool) + go waitbs(replicas, podsCh, podCh) + + select { + case podRes := <-podCh: + if podRes == false { + fmt.Println("bbbbbbbbbbbbbb") + ch <- false + } + case <-t.C: + fmt.Println("cccccccccccccccccccc") + ch <- false + } + } + fmt.Println("dddddddddddddddddd") + ch <- true +} + +func waitbs(length int, array []chan bool, ch chan bool) { + count := 0 + for i := 0; i < length; i++ { + res := <-array[i] + if res == false { + ch <- res + break + } else { + count++ + } + } + if count == length-1 { + ch <- true + } +} + +func getPodBussinessRes(checkUrl string, statusCheckInterval int64, statusCheckCount int, ch chan bool) { + for i := 0; i < statusCheckCount; i++ { + if i == 0 && 0 == requestBsRes(checkUrl) { + ch <- true + return + } + + tick := time.NewTimer(time.Duration(statusCheckInterval) * time.Second) + <-tick.C + bsRes := requestBsRes(checkUrl) + if bsRes == 200 { + ch <- true + return + } + } + ch <- false +} + +// getBsRes : request to checkUrl, get 200:success, 0, ignore, others, failed +func requestBsRes(checkUrl string) int { + // read res from checkUrl + return 200 +} diff --git a/module/kubernetes/kubernetes.go b/module/kubernetes/kubernetes.go index 2c101d2..d15bfd8 100644 --- a/module/kubernetes/kubernetes.go +++ b/module/kubernetes/kubernetes.go @@ -1,15 +1,6 @@ package kubernetes -import ( - "fmt" - - // "github.com/containerops/vessel/models" - // "k8s.io/kubernetes/pkg/api" - "github.com/containerops/vessel/setting" - "k8s.io/kubernetes/pkg/client/restclient" - "k8s.io/kubernetes/pkg/client/unversioned" - // "k8s.io/kubernetes/pkg/util/intstr" -) +// "github.com/containerops/vessel/setting" const ( Added = "ADDED" @@ -22,24 +13,6 @@ const ( OK = "OK" ) -type IpPort struct { - Ip string `json:"ip"` - Port int `json:"port"` -} - -var CLIENT *unversioned.Client - -func New(hostIp string) { - clientConfig := restclient.Config{} - clientConfig.Host = setting.RunTime.Database.Host - client, err := unversioned.New(&clientConfig) - if err != nil { - fmt.Errorf("New unversioned client err: %v!\n", err.Error()) - } - // client.ConfigMaps(namespace). - CLIENT = client -} - /*func GetHostIp() string { return setting.RunTime.Database.Host } @@ -106,24 +79,24 @@ func CreateK8SResource(pipelineversion *models.PipelineVersion) error { // } // Going to support create namespace after we have namespace watch lib - // _, err := CLIENT.Namespaces().Get(piplineMetadata.Namespace) + // _, err := models.K8sClient.Namespaces().Get(piplineMetadata.Namespace) // if err != nil { // namespaceObj := &api.Namespace{ // ObjectMeta: api.ObjectMeta{Name: piplineMetadata.Namespace}, // } - // if _, err := CLIENT.Namespaces().Create(namespaceObj); err != nil { + // if _, err := models.K8sClient.Namespaces().Create(namespaceObj); err != nil { // fmt.Errorf("Create namespace err : %v\n", err) // return err // } // fmt.Println("dddddd") // } - if _, err := CLIENT.ReplicationControllers(piplineMetadata.Namespace).Create(rc); err != nil { + if _, err := models.K8sClient.ReplicationControllers(piplineMetadata.Namespace).Create(rc); err != nil { fmt.Errorf("Create rc err : %v\n", err) return err } - if _, err := CLIENT.Services(piplineMetadata.Namespace).Create(service); err != nil { + if _, err := models.K8sClient.Services(piplineMetadata.Namespace).Create(service); err != nil { fmt.Errorf("Create service err : %v\n", err) return err } diff --git a/module/kubernetes/namespace.go b/module/kubernetes/namespace.go index f946bb8..d3eb626 100644 --- a/module/kubernetes/namespace.go +++ b/module/kubernetes/namespace.go @@ -1,25 +1,17 @@ package kubernetes import ( - // "encoding/json" - // "errors" "fmt" "time" "github.com/containerops/vessel/models" - // "k8s.io/kubernetes/pkg/api" - // "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/watch" ) -func CreateNamespace(pipelineVersion *models.PipelineVersion) error { +func CreateNamespace(pipelineVersion *models.PipelineSpecTemplate) error { piplineMetadata := pipelineVersion.MetaData - // pipelineStageSpecs := pipelineVersion.StageSpecs - // Going to support create namespace after we have namespace watch lib - /* _, err := CLIENT.Namespaces().Get(piplineMetadata.Namespace) - if err != nil {*/ namespaceObj := &api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: piplineMetadata.Namespace, @@ -28,7 +20,7 @@ func CreateNamespace(pipelineVersion *models.PipelineVersion) error { } namespaceObj.SetLabels(map[string]string{"app": piplineMetadata.Name}) - if _, err := CLIENT.Namespaces().Create(namespaceObj); err != nil { + if _, err := models.K8sClient.Namespaces().Create(namespaceObj); err != nil { fmt.Errorf("Create namespace err : %v\n", err) return err } @@ -41,44 +33,30 @@ func WatchNamespaceStatus(labelKey string, labelValue string, timeout int64, che fmt.Errorf("Params checkOp err, checkOp: %v", checkOp) } - //opts := api.ListOptions{FieldSelector: fields.Set{"kind": "pod"}.AsSelector()} opts := api.ListOptions{LabelSelector: labels.Set{labelKey: labelValue}.AsSelector()} - w, err := CLIENT.Namespaces().Watch(opts) + w, err := models.K8sClient.Namespaces().Watch(opts) if err != nil { ch <- Error return - // fmt.Errorf("Get watch interface err") - // return "", err } t := time.NewTimer(time.Second * time.Duration(timeout)) - for { - select { - case event, ok := <-w.ResultChan(): - //fmt.Println(event.Type) - if !ok { - ch <- Error - return - // fmt.Errorf("Watch err\n") - // return "", errors.New("error occours from watch chanle") - } - //fmt.Println(event.Type) + select { + case event, ok := <-w.ResultChan(): + if !ok { + ch <- Error + } else if string(event.Type) == checkOp { // Pod have phase, so we have to wait for the phase change to the right status when added - if string(event.Type) == checkOp { - fmt.Println(event.Object.(*api.Namespace).Status.Phase) + fmt.Println(event.Object.(*api.Namespace).Status.Phase) - if (checkOp == string(watch.Deleted)) || ((checkOp != string(watch.Deleted)) && - (event.Object.(*api.Namespace).Status.Phase == "Active")) { - ch <- OK - return - // return "OK", nil - } + if (checkOp == string(watch.Deleted)) || ((checkOp != string(watch.Deleted)) && + (event.Object.(*api.Namespace).Status.Phase == "Active")) { + ch <- OK } - - case <-t.C: - ch <- Timeout - return - // return "TIMEOUT", nil } + + case <-t.C: + fmt.Println("Watch namespace timeout") + ch <- Timeout } } diff --git a/module/kubernetes/pipeline.go b/module/kubernetes/pipeline.go index 6918066..1060669 100644 --- a/module/kubernetes/pipeline.go +++ b/module/kubernetes/pipeline.go @@ -1,116 +1,117 @@ package kubernetes import ( - // "encoding/json" - // "fmt" + "fmt" "github.com/containerops/vessel/models" - // "k8s.io/kubernetes/pkg/api" - // "k8s.io/kubernetes/pkg/util/intstr" ) -func StartPipeline(pipelineVersion *models.PipelineVersion) error { +func StartPipeline(pipelineVersion *models.PipelineSpecTemplate, stageName string) error { piplineMetadata := pipelineVersion.MetaData - if _, err := CLIENT.Namespaces().Get(piplineMetadata.Namespace); err != nil { - // fmt.Println("111111111111111") + if _, err := models.K8sClient.Namespaces().Get(piplineMetadata.Namespace); err != nil { if err := CreateNamespace(pipelineVersion); err != nil { return err } } - // fmt.Println("222222222222222222222222") - /*if status, err := WatchNamespaceStatus("app", piplineMetadata.Name, 30, Added); err != nil || status != "OK" { - // if status != "OK" { - return err - // } - } - */ - if err := CreateRC(pipelineVersion); err != nil { + if err := CreateRC(pipelineVersion, stageName); err != nil { return err } - if err := CreateService(pipelineVersion); err != nil { + if err := CreateService(pipelineVersion, stageName); err != nil { return err } - // CLIENT.Pods(namespace).Get(name).Status.PodIP - // CLIENT. - //createrc && createservice - return nil -} -func DeletePipeline(pipelineVersion *models.PipelineVersion) error { return nil } -func GetPipelinePodsIPort(pipelineVersion *models.PipelineVersion, podIps *[]IpPort) error { - for _, stage := range pipelineVersion.StageSpecs { - podIp, err := getPodIp(pipelineVersion.GetMetadata().Namespace, stage.Name) - if err != nil { - return err - } +func DeletePipeline(pipelineVersion *models.PipelineSpecTemplate) error { + meta := pipelineVersion.MetaData + specs := pipelineVersion.Spec - (*podIps) = append(*podIps, IpPort{Ip: podIp, Port: stage.Port}) + for _, spec := range specs { + models.K8sClient.ReplicationControllers(meta.Namespace).Delete(spec.Name) + models.K8sClient.Services(meta.Namespace).Delete(spec.Name) } + + models.K8sClient.Namespaces().Delete(meta.Namespace) + return nil } -func WatchPipelineStatus(pipelineVersion *models.PipelineVersion, checkOp string, ch chan string) { +func WatchPipelineStatus(pipelineVersion *models.PipelineSpecTemplate, stageName string, checkOp string, ch chan string) { + fmt.Println("Enter WatchPipelineStatus") labelKey := "app" - pipelineMetadata := pipelineVersion.GetMetadata() - nsLabelValue := pipelineMetadata.Name + pipelineMetadata := pipelineVersion.MetaData + // nsLabelValue := pipelineMetadata.Name timeout := pipelineMetadata.TimeoutDuration namespace := pipelineMetadata.Namespace - stageSpecs := pipelineVersion.GetSpec() - length := len(stageSpecs) - nsCh := make(chan string) - rcChs := make([]chan string, length) - // rcArray := make([]string, length) - serviceChs := make([]chan string, length) - // serviceArray := make([]string, length) - - go WatchNamespaceStatus(labelKey, nsLabelValue, timeout, checkOp, nsCh) - for i, stageSpec := range stageSpecs { - go WatchRCStatus(namespace, labelKey, stageSpec.Name, timeout, checkOp, rcChs[i]) - go WatchServiceStatus(namespace, labelKey, stageSpec.Name, timeout, checkOp, serviceChs[i]) - } + // stageSpecs := pipelineVersion.Spec + // length := len(stageSpecs) + // 0423 nsCh := make(chan string) + //rcCh := make([]chan string, length) + //serviceCh := make([]chan string, length) + //0423 + // go WatchNamespaceStatus(labelKey, nsLabelValue, timeout, checkOp, nsCh) + // rcCh := make(chan string, length) + // serviceCh := make(chan string, length) + + // for _, stageSpec := range stageSpecs { + rcCh := make(chan string) + serviceCh := make(chan string) - // nsRes := make(chan string) - rcRes := make(chan string) - serviceRes := make(chan string) - // go waitNamespace(nsCh, nsRes) - go wait(length, rcChs, rcRes) - go wait(length, serviceChs, serviceRes) + go WatchRCStatus(namespace, labelKey, stageName, timeout, checkOp, rcCh) + go WatchServiceStatus(namespace, labelKey, stageName, timeout, checkOp, serviceCh) + // } - ns := OK + //rcRes := make(chan string) + // serviceRes := make(chan string) + // go wait(length, rcChs, rcRes) + // go wait(length, serviceChs, serviceRes) + + // ns := OK rc := OK service := OK - for i := 0; i < 3; i++ { + rcCount := 0 + serviceCount := 0 + for i := 0; i < 2; i++ { select { - case ns = <-nsCh: - if ns == Error || ns == Timeout { - ch <- ns - return - } - // temp = nameRes - case rc = <-rcRes: + /* + case ns = <-nsCh: + if ns == Error || ns == Timeout { + fmt.Println("Get watch ns event err or timeout") + ch <- ns + return + } + */ + case rc = <-rcCh: if rc == Error || rc == Timeout { + fmt.Println("Get watch rc event err or timeout") ch <- rc return + } else { + rcCount++ + fmt.Printf("Get watch rc event OK count %v\n", rcCount) } - case service = <-serviceRes: + case service = <-serviceCh: if service == Error || service == Timeout { + fmt.Println("Get watch service event err or timeout") ch <- service return + } else { + serviceCount++ + fmt.Printf("Get watch service event ok count %v\n", serviceCount) } } } + fmt.Println("WatchPipelineStatus return OK") ch <- OK - return + // return } -func wait(length int, array []chan string, ch chan string) { +/*func wait(length int, array []chan string, ch chan string) { count := 0 for i := 0; i < length; i++ { res := <-array[i] @@ -125,3 +126,4 @@ func wait(length int, array []chan string, ch chan string) { ch <- OK } } +*/ diff --git a/module/kubernetes/pod.go b/module/kubernetes/pod.go index 9e60ffb..f71b080 100644 --- a/module/kubernetes/pod.go +++ b/module/kubernetes/pod.go @@ -1,24 +1,20 @@ package kubernetes import ( - // "encoding/json" "errors" "fmt" "time" - // "github.com/containerops/vessel/models" + "github.com/containerops/vessel/models" "k8s.io/kubernetes/pkg/api" - // "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/watch" - // "k8s.io/kubernetes/pkg/util/intstr" ) // CheckPod check weather the pod spcified by namespace and podname is exist func CheckPod(namespace string, podName string) bool { - pods, err := CLIENT.Pods(namespace).List(api.ListOptions{}) - // CLIENT.Pods(namespace).List(opts).Items[0]. + pods, err := models.K8sClient.Pods(namespace).List(api.ListOptions{}) if err != nil { fmt.Errorf("List pods err: %v\n", err.Error()) } @@ -31,22 +27,31 @@ func CheckPod(namespace string, podName string) bool { return false } -func getPodIp(namespace string, podName string) (string, error) { - pod, err := CLIENT.Pods(namespace).Get(podName) +func getPodIp(namespace string, rcName string, ipArray *[]string) error { + // pod, err := models.K8sClient.Pods(namespace).Get(podName) + + opts := api.ListOptions{LabelSelector: labels.Set{"app": rcName}.AsSelector()} + pods, err := models.K8sClient.Pods(namespace).List(opts) if err != nil { - fmt.Errorf("Get pod %v err: %v\n", podName, err) + fmt.Printf("getPodIp err %v\n", err) + return err + } + for i, pod := range pods.Items { + (*ipArray)[i] = pod.Status.PodIP + } + + return nil + /*if err != nil { + fmt.Printf("Get pod %v err: %v\n", podName, err) return "", err } - return pod.Status.PodIP, nil + return pod.Status.PodIP, nil*/ } -// func getPodsIp(namespace string, pods) - // GetPodPhase get phase of the resource by namespace and podname, return empty string when no pod find func GetPodStatus(namespace string, podName string) string { - - pods, err := CLIENT.Pods(namespace).List(api.ListOptions{}) + pods, err := models.K8sClient.Pods(namespace).List(api.ListOptions{}) if err != nil { fmt.Errorf("List pods err: %v\n", err.Error()) } @@ -55,39 +60,31 @@ func GetPodStatus(namespace string, podName string) string { if pod.Name == podName { return string(pod.Status.Phase) } - // pod.Spec. } return "" } -// func DeletePod(pipelineVersion ) - // WatchPodStatus return status of the operation(specified by checkOp) of the pod, OK, TIMEOUT. func WatchPodStatus(podNamespace string, labelKey string, labelValue string, timeout int, checkOp string) (string, error) { if checkOp != string(watch.Deleted) && checkOp != string(watch.Added) { fmt.Errorf("Params checkOp err, checkOp: %v", checkOp) } - //opts := api.ListOptions{FieldSelector: fields.Set{"kind": "pod"}.AsSelector()} opts := api.ListOptions{LabelSelector: labels.Set{labelKey: labelValue}.AsSelector()} - - w, err := CLIENT.Pods(podNamespace).Watch(opts) + w, err := models.K8sClient.Pods(podNamespace).Watch(opts) if err != nil { fmt.Errorf("Get watch interface err") return "", err } t := time.NewTimer(time.Second * time.Duration(timeout)) - for { select { case event, ok := <-w.ResultChan(): - //fmt.Println(event.Type) if !ok { fmt.Errorf("Watch err\n") return "", errors.New("error occours from watch chanle") } - //fmt.Println(event.Type) // Pod have phase, so we have to wait for the phase change to the right status when added if string(event.Type) == checkOp { if (checkOp == string(watch.Deleted)) || ((checkOp != string(watch.Deleted)) && @@ -95,7 +92,6 @@ func WatchPodStatus(podNamespace string, labelKey string, labelValue string, tim return "OK", nil } } - case <-t.C: return "TIMEOUT", nil } diff --git a/module/kubernetes/replicationcontroller.go b/module/kubernetes/replicationcontroller.go index 68b92b2..721fba4 100644 --- a/module/kubernetes/replicationcontroller.go +++ b/module/kubernetes/replicationcontroller.go @@ -1,111 +1,115 @@ package kubernetes import ( - // "encoding/json" - // "errors" "fmt" "time" - // "k8s.io/kubernetes/pkg/api" "github.com/containerops/vessel/models" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/watch" - // "k8s.io/kubernetes/pkg/util/intstr" ) -func CreateRC(piplelineVersion *models.PipelineVersion) error { - // piplineMetadata := piplelineVersion.MetaData - stagespecs := piplelineVersion.StageSpecs +func CreateRC(piplelineVersion *models.PipelineSpecTemplate, stageName string) error { + stagespecs := piplelineVersion.Spec + metadata := piplelineVersion.MetaData for _, stagespec := range stagespecs { - rc := &api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{}, - }, - Spec: api.ReplicationControllerSpec{ - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{}, + if stagespec.Name == stageName { + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{}, + }, + Spec: api.ReplicationControllerSpec{ + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{}, + }, }, + Selector: map[string]string{}, }, - Selector: map[string]string{}, - }, - } + } - rc.Spec.Template.Spec.Containers = make([]api.Container, 1) - rc.SetName(stagespec.Name) - // rc.SetNamespace(piplineMetadata.Namespace) - // rc.SetNamespace(api.NamespaceDefault) - rc.SetNamespace("zenlin-namespace") - rc.Labels["app"] = stagespec.Name - rc.Spec.Replicas = stagespec.Replicas - rc.Spec.Template.SetName(stagespec.Name) - rc.Spec.Template.Labels["app"] = stagespec.Name - rc.Spec.Template.Spec.Containers[0] = api.Container{Ports: []api.ContainerPort{api.ContainerPort{ - Name: stagespec.Name, - ContainerPort: stagespec.Port}}, - Name: stagespec.Name, - Image: stagespec.Image} - rc.Spec.Selector["app"] = stagespec.Name + rc.Spec.Template.Spec.Containers = make([]api.Container, 1) + rc.SetName(stagespec.Name) + rc.SetNamespace(metadata.Namespace) + rc.Labels["app"] = stagespec.Name + rc.Spec.Replicas = stagespec.Replicas + rc.Spec.Template.SetName(stagespec.Name) + rc.Spec.Template.Labels["app"] = stagespec.Name + if stagespec.EnvName != "" && stagespec.EnvValue != "" { + rc.Spec.Template.Spec.Containers[0] = api.Container{Ports: []api.ContainerPort{api.ContainerPort{ + Name: stagespec.Name, + ContainerPort: stagespec.Port}}, + Name: stagespec.Name, + Image: stagespec.Image, + ImagePullPolicy: "IfNotPresent", + Env: []api.EnvVar{api.EnvVar{ + Name: stagespec.EnvName, + Value: stagespec.EnvValue}}, + } + } else { + rc.Spec.Template.Spec.Containers[0] = api.Container{Ports: []api.ContainerPort{api.ContainerPort{ + Name: stagespec.Name, + ContainerPort: stagespec.Port}}, + Name: stagespec.Name, + Image: stagespec.Image, + ImagePullPolicy: "IfNotPresent", + } + } + /* + if stagespec.EnvName != "" && stagespec.EnvValue != "" { + rc.Spec.Template.Spec.Containers[0].Env + }*/ + rc.Spec.Selector["app"] = stagespec.Name - if _, err := CLIENT.ReplicationControllers("zenlin-namespace").Create(rc); err != nil { - fmt.Println("Create rc err : %v\n", err) - return err + if _, err := models.K8sClient.ReplicationControllers(metadata.Namespace).Create(rc); err != nil { + fmt.Println("Create rc err : %v\n", err) + return err + } } } return nil } -func DeleteRC(pipelineVersion *models.PipelineVersion) error { +func DeleteRC(pipelineVersion *models.PipelineSpecTemplate) error { return nil } // WatchServiceStatus return status of the operation(specified by checkOp) of the pod, OK, TIMEOUT. func WatchRCStatus(Namespace string, labelKey string, labelValue string, timeout int64, checkOp string, ch chan string) { + fmt.Println("Enter WatchRCStatus") if checkOp != string(watch.Deleted) && checkOp != string(watch.Added) { - fmt.Errorf("Params checkOp err, checkOp: %v", checkOp) + fmt.Printf("Params checkOp err, checkOp: %v", checkOp) } - //opts := api.ListOptions{FieldSelector: fields.Set{"kind": "pod"}.AsSelector()} opts := api.ListOptions{LabelSelector: labels.Set{labelKey: labelValue}.AsSelector()} - - w, err := CLIENT.ReplicationControllers(Namespace).Watch(opts) + w, err := models.K8sClient.ReplicationControllers(Namespace).Watch(opts) if err != nil { ch <- Error - // return "", err - // fmt.Errorf("Get watch interface err") + fmt.Printf("Get watch RC interface err %v\n", err) + return } t := time.NewTimer(time.Second * time.Duration(timeout)) - - for { - select { - case event, ok := <-w.ResultChan(): - //fmt.Println(event.Type) - if !ok { - ch <- Error - return - // fmt.Errorf("Watch err\n") - // return "", errors.New("error occours from watch chanle") - } - //fmt.Println(event.Type) - if string(event.Type) == checkOp { - ch <- OK - return - // return "OK", nil - } - - case <-t.C: - ch <- Timeout - return - // return "TIMEOUT", nil + select { + case event, ok := <-w.ResultChan(): + if !ok { + fmt.Println("Get RC event !ok") + ch <- Error + } else if string(event.Type) == checkOp { + fmt.Println("Get RC event ok") + ch <- OK } + + case <-t.C: + fmt.Println("WatchRCStatus timeout") + ch <- Timeout } } // checkRC rc have no status, once the rc are found, it is with running status func CheckRC(namespace string, rcName string) bool { - rcs, err := CLIENT.ReplicationControllers(namespace).List(api.ListOptions{}) + rcs, err := models.K8sClient.ReplicationControllers(namespace).List(api.ListOptions{}) if err != nil { fmt.Errorf("List rcs err: %v\n", err.Error()) } diff --git a/module/kubernetes/service.go b/module/kubernetes/service.go index bb84d0f..995ab9f 100644 --- a/module/kubernetes/service.go +++ b/module/kubernetes/service.go @@ -1,49 +1,47 @@ package kubernetes import ( - // "encoding/json" - // "errors" "fmt" "time" "github.com/containerops/vessel/models" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util/intstr" - // "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/watch" ) -func CreateService(pipelineVersion *models.PipelineVersion) error { - // piplineMetadata := pipelineVersion.MetaData - stagespecs := pipelineVersion.StageSpecs +func CreateService(pipelineVersion *models.PipelineSpecTemplate, stageName string) error { + stagespecs := pipelineVersion.Spec + metadata := pipelineVersion.MetaData for _, stagespec := range stagespecs { - service := &api.Service{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{}, - }, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - }, - } + if stagespec.Name == stageName { + service := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{}, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + }, + } - service.Spec.Ports = make([]api.ServicePort, 1) - service.ObjectMeta.SetName(stagespec.Name) - // service.ObjectMeta.SetNamespace(piplineMetadata.Namespace) - service.ObjectMeta.SetNamespace("zenlin-namespace") - service.ObjectMeta.Labels["app"] = stagespec.Name - service.Spec.Ports[0] = api.ServicePort{Port: stagespec.Port, TargetPort: intstr.FromString(stagespec.Name)} - service.Spec.Selector["app"] = stagespec.Name + service.Spec.Ports = make([]api.ServicePort, 1) + service.ObjectMeta.SetName(stagespec.Name) + service.ObjectMeta.SetNamespace(metadata.Namespace) + service.ObjectMeta.Labels["app"] = stagespec.Name + service.Spec.Ports[0] = api.ServicePort{Port: stagespec.Port, TargetPort: intstr.FromString(stagespec.Name)} + service.Spec.Selector["app"] = stagespec.Name - if _, err := CLIENT.Services("zenlin-namespace").Create(service); err != nil { - fmt.Println("Create service err : %v\n", err) - return err + if _, err := models.K8sClient.Services(metadata.Namespace).Create(service); err != nil { + fmt.Println("Create service err : %v\n", err) + return err + } } } return nil } -func DeleteService(pipelineVersion *models.PipelineVersion) error { +func DeleteService(pipelineVersion *models.PipelineSpecTemplate) error { return nil } @@ -53,47 +51,32 @@ func WatchServiceStatus(Namespace string, labelKey string, labelValue string, ti fmt.Errorf("Params checkOp err, checkOp: %v", checkOp) } - //opts := api.ListOptions{FieldSelector: fields.Set{"kind": "pod"}.AsSelector()} opts := api.ListOptions{LabelSelector: labels.Set{labelKey: labelValue}.AsSelector()} - w, err := CLIENT.Services(Namespace).Watch(opts) + w, err := models.K8sClient.Services(Namespace).Watch(opts) if err != nil { ch <- Error - // fmt.Errorf("Get watch interface err") - // return } t := time.NewTimer(time.Second * time.Duration(timeout)) - for { - select { - case event, ok := <-w.ResultChan(): - //fmt.Println(event.Type) - if !ok { - ch <- Error - return - // fmt.Errorf("Watch err\n") - // return "", errors.New("error occours from watch chanle") - } - //fmt.Println(event.Type) - if string(event.Type) == checkOp { - ch <- OK - return - // return "OK", nil - } - - case <-t.C: - ch <- Timeout - return - // return "TIMEOUT", nil + select { + case event, ok := <-w.ResultChan(): + if !ok { + ch <- Error + } else if string(event.Type) == checkOp { + ch <- OK } + + case <-t.C: + ch <- Timeout } } // CheckService service have no status, once the service are found, it is with running status func CheckService(namespace string, serviceName string) bool { - services, err := CLIENT.Services(namespace).List(api.ListOptions{}) + services, err := models.K8sClient.Services(namespace).List(api.ListOptions{}) if err != nil { fmt.Errorf("List services err: %v\n", err.Error()) } diff --git a/module/pipeline/pipelineVersion.go b/module/pipeline/pipelineVersion.go index d9225e1..78e2006 100644 --- a/module/pipeline/pipelineVersion.go +++ b/module/pipeline/pipelineVersion.go @@ -2,16 +2,15 @@ package pipeline import ( "encoding/json" + "fmt" + "github.com/containerops/vessel/models" + "github.com/containerops/vessel/module/etcd" + "golang.org/x/net/context" "log" - "math/rand" "strconv" "strings" "time" - "github.com/containerops/vessel/models" - "github.com/containerops/vessel/module/etcd" - "golang.org/x/net/context" - kubeclient "github.com/containerops/vessel/module/kubernetes" "github.com/coreos/etcd/client" ) @@ -161,16 +160,21 @@ func isFinish(finishChan chan models.StageVersionState, stageVersionStateChan ch for { if finishStageNum == sumStage { + fmt.Println("######################################finishStageNum == sumStage") notifyBootDone <- true + // stageVersionStateChan <- strings.Join(failedList, ",") stageVersionStateStr, _ := json.Marshal(stageVersionStateList) stageVersionStateChan <- string(stageVersionStateStr) return } - + fmt.Println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$finishStageNum") + fmt.Println(finishStageNum) stageVersionState := <-finishChan stageVersionState.ChangeStageVersionState() finishStageNum++ + fmt.Println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$finishStageNum+++++") + fmt.Println(finishStageNum) stageVersionStateList = append(stageVersionStateList, stageVersionState) } } @@ -257,7 +261,9 @@ func startStage(stageVersion *models.StageVersion, bootChan chan *models.StageVe // start run stage stageStartFinish := make(chan models.StageVersionState, 1) - timeout := make(chan bool, 1) + go startStageInK8S(stageStartFinish, stageVersionState) + + /*timeout := make(chan bool, 1) // stageStartFinish <- stageVersionState go startStageInK8S(stageStartFinish, stageVersionState) go func() { @@ -274,6 +280,9 @@ func startStage(stageVersion *models.StageVersion, bootChan chan *models.StageVe case startResult := <-stageStartFinish: stageVersionState = startResult } + */ + startResult := <-stageStartFinish + stageVersionState = startResult changeStageVersionState(stageVersion, bootChan, stageVersionState.RunResult, stageVersionState.Detail) finishChan <- stageVersionState @@ -308,10 +317,14 @@ func changeStageVersionState(stageVersion *models.StageVersion, bootChan chan *m if err != nil { log.Println("[changeStageVersionState]:error when shoutdown stage version:", err) } + fmt.Println("*********************************************************************") + fmt.Println(state) if state == StateSuccess || state == StateFailed { for _, toStageVersionName := range strings.Split(toStageVersions, ",") { + fmt.Println(toStageVersions) if toStageVersionName != "" { + fmt.Println(toStageVersionName) var toStageVersion models.StageVersion toStageVersion = *stageVersion toStageVersion.Name = toStageVersionName @@ -319,69 +332,68 @@ func changeStageVersionState(stageVersion *models.StageVersion, bootChan chan *m } } } - + fmt.Println("*********************************************************************") } func startStageInK8S(runResultChan chan models.StageVersionState, runResult models.StageVersionState) { - - /* sec := rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(5) + 3 - timeStr := strconv.FormatInt(sec, 10) + "s" - timeDur, _ := time.ParseDuration(timeStr) - time.Sleep(timeDur) - if rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(100) < 50 { - runResult.RunResult = StateSuccess - runResult.Detail = "run success" - } else { - runResult.RunResult = StateFailed - runResult.Detail = "not luck" - }*/ - pipelineVersion := models.GetPipelineVersion(runResult.PipelineVersionId) - watchCh := make(chan string) - // hostip := kubeclient.GetHostIp() - - //ipProts to store ips and ports of pods in pipeline after it is stated - ipPorts := &[]kubeclient.IpPort{} - if err := kubeclient.GetPipelinePodsIPort(pipelineVersion, ipPorts); err != nil { - log.Printf("Get podsip in pipeline %v, err\n", pipelineVersion.GetMetadata().Name, err) + pipelineSpecTemplate := new(models.PipelineSpecTemplate) + + stageName := runResult.StageName + fmt.Printf("Enter startStageInK8S to start stage %v\n", stageName) + err := json.Unmarshal([]byte(pipelineVersion.Detail), pipelineSpecTemplate) + // fmt.Printf("goting to deal with stage name = %v\n", stageName) + fmt.Printf("goting to deal with pipelinePecTemplate detail = %v\n", pipelineSpecTemplate) + // err := json.Unmarshal([]byte(pipelineVersion.Detail), pipelineSpecTemplate) + if err != nil { + log.Printf("Unmarshal PipelineSpecTemplate err : %v\n") } + fmt.Println(pipelineSpecTemplate) + k8sCh := make(chan string) + bsCh := make(chan bool) - // First, to watch the pipeline status and then to start it - go kubeclient.WatchPipelineStatus(pipelineVersion, kubeclient.Added, watchCh) - if err := kubeclient.StartPipeline(pipelineVersion); err != nil { - log.Printf("Start k8s resource pipeline name :%v err : %v\n", pipelineVersion.MetaData.Name, err) - } + go kubeclient.WatchPipelineStatus(pipelineSpecTemplate, stageName, kubeclient.Added, k8sCh) - startStatus := <-watchCh - if startStatus == kubeclient.OK { - for _, ipPort := range ipPorts { - // Should be go routine here to ensure the causetime is not out of timeout - if checkBussinessResult(ipPort.Ip, ipPort.Port) { - // Going to write other things there, creationTimeStamp, selfLink - runResult.RunResult <- StartSucessful - } else { - runResult.RunResult <- StartFailed - } + // runResult.RunResult = <-k8sCh + if err := kubeclient.StartPipeline(pipelineSpecTemplate, stageName); err != nil { + log.Printf("Start k8s resource pipeline name :%v err : %v\n", pipelineSpecTemplate.MetaData.Name, err) + } + go kubeclient.GetPipelineBussinessRes(pipelineSpecTemplate, bsCh) + // fmt.Println("11111111111111") + k8sRes := "" + bsRes := true + for i := 0; i < 2; i++ { + select { + case k8sRes = <-k8sCh: + fmt.Printf("k8sCh start stage name = %v return %v\n", stageName, k8sRes) + case bsRes = <-bsCh: + // fmt.Printf("bsCh return %v\n", bsRes) } - } else if startStatus == kubeclient.Error { - runResult.RunResult <- StartFailed } - runResult.RunResult <- StartTimeout - - runResultChan <- runResult -} - -// checkBussinessResult : get bussiness result from pipelineVersion.statusCheckUrl, success:200, ignore:0,others:failed -func checkBussinessResult(ip string, port int64) bool { - // Later, to put read checkStatusUrl here and get return value to res - // Read for statusCheckCount times, each time should in statusCheckInterval - var res int - if res == 200 { - return true - } else if res == 0 { - return true + if k8sRes == StartFailed { + fmt.Printf("k8s module stage name = %v ret %v\n", stageName, StateFailed) + runResult.RunResult = StartFailed + runResult.Detail = StartFailed + runResultChan <- runResult + } else if k8sRes == StartSucessful { + // fmt.Printf("k8s res %v\n", StartSucessful) + if bsRes == true { + fmt.Printf("k8s module stage name = %v ret %v\n", stageName, StartSucessful) + runResult.RunResult = StateSuccess + runResult.Detail = StateSuccess + runResultChan <- runResult + } else { + fmt.Printf("k8s module stage name = %v ret %v\n", stageName, StartFailed) + runResult.RunResult = StartFailed + runResult.Detail = StartFailed + runResultChan <- runResult + } + } else { + fmt.Printf("k8s module stage name = %v ret %v\n", stageName, StartTimeout) + runResult.RunResult = StartTimeout + runResult.Detail = StartTimeout + runResultChan <- runResult } - return false } diff --git a/router/router.go b/router/router.go index aa3b095..905fe63 100644 --- a/router/router.go +++ b/router/router.go @@ -34,7 +34,7 @@ func SetRouters(m *macaron.Macaron) { m.Post("/", binding.Bind(models.PipelineSpecTemplate{}), handler.V1POSTPipelineHandler) m.Put("/:pipeline", handler.V1PUTPipelineHandler) m.Get("/:pipeline", handler.V1GETPipelineHandler) - m.Delete("/:pipeline", handler.V1DELETEPipelineHandler) + m.Delete("/", binding.Bind(models.PipelineSpecTemplate{}), handler.V1DELETEPipelineHandler) m.Put("/:pipeline/run", handler.V1RunPipelineHandler) diff --git a/setting/setting.go b/setting/setting.go index a8c9984..f9ac48d 100644 --- a/setting/setting.go +++ b/setting/setting.go @@ -1,6 +1,7 @@ package setting import ( + "fmt" "io/ioutil" "github.com/ghodss/yaml" @@ -41,6 +42,10 @@ type RunTimeConf struct { Username string Password string } + K8s struct { + Host string + Port string + } } var ( @@ -70,6 +75,6 @@ func InitConf(globalFilePath string, runtimeFilePath string) error { if err != nil { return err } - + fmt.Println(RunTime) return nil }