Skip to content

Commit

Permalink
feat(job): add eligible tunable (#65)
Browse files Browse the repository at this point in the history
This commit has following features:
- eligible check to enable or disable a job
- thinkTimeInSeconds to delay start of a Job
- failFast to stop retrying in case of specific errors
- ignoreError to mark a task as warning & job to completed

Signed-off-by: AmitKumarDas <[email protected]>
  • Loading branch information
Amit Kumar Das authored May 31, 2020
1 parent bd652f3 commit 0f9fe49
Show file tree
Hide file tree
Showing 29 changed files with 1,153 additions and 356 deletions.
19 changes: 19 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package main

import (
"flag"

"k8s.io/klog/v2"
"openebs.io/metac/controller/generic"
"openebs.io/metac/start"

Expand All @@ -41,6 +44,22 @@ import (
// One can consider each registered function as an independent
// kubernetes controller & this project as the operator.
func main() {
flag.Set("alsologtostderr", "true")
flag.Parse()

klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(klogFlags)

// Sync the glog and klog flags.
flag.CommandLine.VisitAll(func(f1 *flag.Flag) {
f2 := klogFlags.Lookup(f1.Name)
if f2 != nil {
value := f1.Value.String()
f2.Value.Set(value)
}
})
defer klog.Flush()

// controller name & corresponding controller reconcile function
var controllers = map[string]generic.InlineInvokeFn{
"sync/job": job.Sync,
Expand Down
15 changes: 10 additions & 5 deletions controller/job/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,20 @@ func (r *Reconciler) setWatchStatusFromJobStatus() {
r.HookResponse.Labels = map[string]*string{
"job.dope.metacontroller.io/phase": pointer.StringPtr(string(r.JobStatus.Phase)),
}
if r.ObservedJob != nil &&
r.ObservedJob.Spec.Refresh.ResyncAfterSeconds != nil {
r.HookResponse.ResyncAfterSeconds = *r.ObservedJob.Spec.Refresh.ResyncAfterSeconds
}
}

func (r *Reconciler) setWatchStatus() {
if r.Err != nil {
// resync since this might be a temporary error
//
// TODO:
// Might be better to expose this from job.spec
r.HookResponse.ResyncAfterSeconds = 5.0
if r.ObservedJob != nil &&
r.ObservedJob.Spec.Refresh.OnErrorResyncAfterSeconds != nil {
// resync based on configuration
r.HookResponse.ResyncAfterSeconds =
*r.ObservedJob.Spec.Refresh.OnErrorResyncAfterSeconds
}
r.setWatchStatusAsError()
return
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module mayadata.io/d-operators
go 1.13

require (
9fans.net/go v0.0.2
github.com/go-resty/resty/v2 v2.2.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/google/go-cmp v0.4.0
Expand All @@ -12,6 +13,7 @@ require (
k8s.io/apiextensions-apiserver v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/client-go v0.17.3
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.0.0
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f
openebs.io/metac v0.3.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
9fans.net/go v0.0.2 h1:RYM6lWITV8oADrwLfdzxmt8ucfW6UtP9v1jg4qAbqts=
9fans.net/go v0.0.2/go.mod h1:lfPdxjq9v8pVQXUMBCx5EO5oLXWQFlKRQgs1kEkjoIM=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -477,6 +479,7 @@ k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/kubernetes v1.18.3 h1:6qtm8v3z+OwYm2SnsTxYUtGCsIbGBZ/Dh9yER+aNIoI=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCuiMMNF8YUVLF3vJo=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
77 changes: 35 additions & 42 deletions pkg/job/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (

// Applyable helps applying desired state(s) against the cluster
type Applyable struct {
*Fixture
Retry *Retryable
BaseRunner
Apply *types.Apply

result *types.ApplyResult
Expand All @@ -40,18 +39,16 @@ type Applyable struct {

// ApplyableConfig helps in creating new instance of Applyable
type ApplyableConfig struct {
Fixture *Fixture
Retry *Retryable
Apply *types.Apply
BaseRunner
Apply *types.Apply
}

// NewApplier returns a new instance of Applyable
func NewApplier(config ApplyableConfig) *Applyable {
return &Applyable{
Apply: config.Apply,
Fixture: config.Fixture,
Retry: config.Retry,
result: &types.ApplyResult{},
BaseRunner: config.BaseRunner,
Apply: config.Apply,
result: &types.ApplyResult{},
}
}

Expand All @@ -66,27 +63,26 @@ func (a *Applyable) postCreateCRD(
// discover custom resource API
err := a.Retry.Waitf(
func() (bool, error) {
got := a.apiDiscovery.
GetAPIForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
got := a.GetAPIForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
if got == nil {
return false, errors.Errorf(
"Failed to discover: Kind %s: APIVersion %s",
crd.Spec.Names.Singular,
crd.Spec.Group+"/"+crd.Spec.Version,
)
return a.IsFailFastOnDiscoveryError(),
errors.Errorf(
"Failed to discover: Kind %s: APIVersion %s",
crd.Spec.Names.Singular,
crd.Spec.Group+"/"+crd.Spec.Version,
)
}
// fetch dynamic client for the custom resource
// corresponding to this CRD
customResourceClient, err := a.dynamicClientset.
GetClientForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
customResourceClient, err := a.GetClientForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
if err != nil {
return false, err
return a.IsFailFastOnDiscoveryError(), err
}
_, err = customResourceClient.List(metav1.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -256,11 +252,10 @@ func (a *Applyable) createResource() (*types.ApplyResult, error) {
a.Apply.State.GetName(),
a.Apply.State.GroupVersionKind(),
)
client, err := a.dynamicClientset.
GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
client, err := a.GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,13 +302,12 @@ func (a *Applyable) updateResource() (*types.ApplyResult, error) {
err := a.Retry.Waitf(
func() (bool, error) {
// get appropriate dynamic client
client, err := a.dynamicClientset.
GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
client, err := a.GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
if err != nil {
return false, err
return a.IsFailFastOnDiscoveryError(), err
}
// get the resource from cluster to update
target, err := client.
Expand Down Expand Up @@ -372,13 +366,12 @@ func (a *Applyable) applyResource() (*types.ApplyResult, error) {
err := a.Retry.Waitf(
func() (bool, error) {
var err error
client, err := a.dynamicClientset.
GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
client, err := a.GetClientForAPIVersionAndKind(
a.Apply.State.GetAPIVersion(),
a.Apply.State.GetKind(),
)
if err != nil {
return false, err
return a.IsFailFastOnDiscoveryError(), err
}
_, err = client.
Namespace(a.Apply.State.GetNamespace()).
Expand Down
32 changes: 11 additions & 21 deletions pkg/job/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
// Assertable is used to perform matches of desired state(s)
// against observed state(s)
type Assertable struct {
*Fixture
Retry *Retryable
TaskName string
Assert *types.Assert
BaseRunner
Assert *types.Assert

assertCheckType types.AssertCheckType
retryOnDiff bool
Expand All @@ -40,20 +38,16 @@ type Assertable struct {

// AssertableConfig is used to create an instance of Assertable
type AssertableConfig struct {
Fixture *Fixture
Retry *Retryable
TaskName string
Assert *types.Assert
BaseRunner
Assert *types.Assert
}

// NewAsserter returns a new instance of Assertion
func NewAsserter(config AssertableConfig) *Assertable {
return &Assertable{
Assert: config.Assert,
Retry: config.Retry,
Fixture: config.Fixture,
TaskName: config.TaskName,
status: &types.AssertStatus{},
BaseRunner: config.BaseRunner,
Assert: config.Assert,
status: &types.AssertStatus{},
}
}

Expand Down Expand Up @@ -85,11 +79,9 @@ func (a *Assertable) init() {
func (a *Assertable) runAssertByPath() {
chk := NewPathChecker(
PathCheckingConfig{
TaskName: a.TaskName,
Fixture: a.Fixture,
State: a.Assert.State,
PathCheck: *a.Assert.PathCheck,
Retry: a.Retry,
BaseRunner: a.BaseRunner,
State: a.Assert.State,
PathCheck: *a.Assert.PathCheck,
},
)
got, err := chk.Run()
Expand All @@ -109,11 +101,9 @@ func (a *Assertable) runAssertByPath() {
func (a *Assertable) runAssertByState() {
chk := NewStateChecker(
StateCheckingConfig{
TaskName: a.TaskName,
Fixture: a.Fixture,
BaseRunner: a.BaseRunner,
State: a.Assert.State,
StateCheck: *a.Assert.StateCheck,
Retry: a.Retry,
},
)
got, err := chk.Run()
Expand Down
43 changes: 43 additions & 0 deletions pkg/job/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2020 The MayaData Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package job

import types "mayadata.io/d-operators/types/job"

// BaseRunner is the common runner used by all action runners
type BaseRunner struct {
*Fixture
TaskIndex int
TaskName string
Retry *Retryable
FailFastRule types.FailFastRule
}

// IsFailFastOnDiscoveryError returns true if logic that leads to
// discovery error should not be re-tried
func (r *BaseRunner) IsFailFastOnDiscoveryError() bool {
return r.FailFastRule == types.FailFastOnDiscoveryError
}

// IsFailFastOnError returns true if logic that lead to given error
// should not be re-tried
func (r *BaseRunner) IsFailFastOnError(err error) bool {
if _, discoveryErr := err.(*DiscoveryError); discoveryErr {
return r.IsFailFastOnDiscoveryError()
}
return false
}
Loading

0 comments on commit 0f9fe49

Please sign in to comment.