diff --git a/go.mod b/go.mod index 8f7cd7b..4b9de4f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect github.com/Microsoft/go-winio v0.4.13 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/Pallinder/go-randomdata v1.2.0 github.com/cskr/pubsub v1.0.2 github.com/davecgh/go-spew v1.1.1 github.com/docker/distribution v2.7.1+incompatible // indirect diff --git a/go.sum b/go.sum index 6722d70..abc5a45 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.4.13 h1:Hmi80lzZuI/CaYmlJp/b+FjZdRZhKu9c2mDVqKl github.com/Microsoft/go-winio v0.4.13/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00lCDlaYPg= +github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= diff --git a/manager/manager.go b/manager/manager.go index c157359..017ff56 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -10,6 +10,7 @@ import ( "code.linksmart.eu/dt/deployment-tool/manager/source" "code.linksmart.eu/dt/deployment-tool/manager/storage" "code.linksmart.eu/dt/deployment-tool/manager/swarmio" + "github.com/Pallinder/go-randomdata" "github.com/cskr/pubsub" uuid "github.com/satori/go.uuid" ) @@ -57,8 +58,7 @@ func startManager(pipe model.Pipe, zmqConf model.ZeromqServerInfo, storageClient } func (m *manager) addOrder(order *storage.Order) error { - // add system generated meta values - order.ID = m.newTaskID() + order.Created = model.UnixTime() // cleanup @@ -95,6 +95,16 @@ func (m *manager) addOrder(order *storage.Order) error { order.Deploy.Match.List = receivers } +generateID: + retry := 0 + order.ID = m.newTaskID(retry) + if tempOrder, err := m.storage.GetOrder(order.ID); err != nil { + return fmt.Errorf("error checking if order ID is unique: %s", err) + } else if tempOrder != nil { // found order with this id + retry++ + goto generateID + } + // place into work directory err := m.fetchSource(order.ID, order.Source) if err != nil { @@ -102,7 +112,7 @@ func (m *manager) addOrder(order *storage.Order) error { } order.Source = nil - err = m.storage.AddOrder(order) + _, err = m.storage.AddOrder(order) if err != nil { return fmt.Errorf("error storing order: %s", err) } @@ -123,7 +133,10 @@ func (m *manager) targetTopics(ids, tags []string) []string { return receiverTopics } -func (m *manager) newTaskID() string { +func (m *manager) newTaskID(retry int) string { + if retry < 10 { + return randomdata.Adjective() + "-" + randomdata.Noun() + } return uuid.NewV4().String() } diff --git a/manager/storage/storage.go b/manager/storage/storage.go index f554ac1..16348d5 100644 --- a/manager/storage/storage.go +++ b/manager/storage/storage.go @@ -17,7 +17,7 @@ import ( type Storage interface { GetOrders(descr string, sortAsc bool, from, size int) ([]Order, int64, error) - AddOrder(*Order) error + AddOrder(*Order) (dublicate bool, err error) GetOrder(id string) (*Order, error) DeleteOrder(id string) (found bool, err error) // @@ -90,6 +90,7 @@ const ( propTypeText = "text" propTypeBool = "boolean" propTypeGeoPoint = "geo_point" + opTypeCreate = "create" ) // StartElasticStorage starts an elastic storage client. It @@ -273,7 +274,7 @@ func (s *storage) AddTargetTrans(target *Target) (conflict bool, trans *transact s.targetLocker.Lock() trans = &transaction{ Commit: elastic.NewBulkIndexRequest().Index(indexTarget).Type(typeFixed). - Id(target.ID).OpType("create").Doc(target), + Id(target.ID).OpType(opTypeCreate).Doc(target), Release: func() { s.targetLocker.Unlock() }, @@ -473,14 +474,18 @@ func (s *storage) DeleteTarget(id string) (*Target, error) { return target, nil } -func (s *storage) AddOrder(order *Order) error { +func (s *storage) AddOrder(order *Order) (dublicate bool, err error) { res, err := s.client.Index().Index(indexOrder).Type(typeFixed). - Id(order.ID).BodyJson(order).Do(s.ctx) + Id(order.ID).BodyJson(order).OpType(opTypeCreate).Do(s.ctx) if err != nil { - return err + e := err.(*elastic.Error) + if e.Status == http.StatusConflict { + return true, nil + } + return false, err } log.Printf("Indexed %s/%s v%d", res.Index, res.Id, res.Version) - return nil + return false, nil } func (s *storage) GetOrders(descr string, sortAsc bool, from, size int) (orders []Order, total int64, err error) { @@ -675,7 +680,7 @@ func (s *storage) GetTokens(name string) ([]TokenMeta, error) { func (s *storage) AddToken(token TokenHashed) (duplicate bool, err error) { res, err := s.client.Index().Index(indexToken).Type(typeFixed). - Id(string(token.Hash)).OpType("create").BodyJson(token).Do(s.ctx) + Id(string(token.Hash)).OpType(opTypeCreate).BodyJson(token).Do(s.ctx) if err != nil { e := err.(*elastic.Error) if e.Status == http.StatusConflict {