Skip to content

Commit

Permalink
feat(timeout): add job elapsed time (#63)
Browse files Browse the repository at this point in the history
This commit adds support to set a job's elapsed time. This may be
used to assert if a job got executed within some desired time
interval.

An experiment has been added that verifies if 50 config maps get
created by a job and all of this should happen within 10 seconds.

In addition, several fixes and minor enhancement have gone in to
make Job more usable to write testcase implementations.

Signed-off-by: AmitKumarDas <[email protected]>
  • Loading branch information
Amit Kumar Das authored May 24, 2020
1 parent 9818369 commit bd652f3
Show file tree
Hide file tree
Showing 17 changed files with 524 additions and 94 deletions.
8 changes: 8 additions & 0 deletions controller/job/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package job

import (
"k8s.io/utils/pointer"
"openebs.io/metac/controller/generic"
k8s "openebs.io/metac/third_party/kubernetes"

commonctrl "mayadata.io/d-operators/common/controller"
"mayadata.io/d-operators/common/unstruct"
Expand Down Expand Up @@ -77,6 +79,9 @@ func (r *Reconciler) setWatchStatusAsError() {
"phase": "Error",
"reason": r.Err.Error(),
}
r.HookResponse.Labels = map[string]*string{
"job.dope.metacontroller.io/phase": k8s.StringPtr("Error"),
}
}

func (r *Reconciler) setWatchStatusFromJobStatus() {
Expand All @@ -88,6 +93,9 @@ func (r *Reconciler) setWatchStatusFromJobStatus() {
"taskCount": int64(r.JobStatus.TaskCount),
"taskListStatus": r.JobStatus.TaskListStatus,
}
r.HookResponse.Labels = map[string]*string{
"job.dope.metacontroller.io/phase": pointer.StringPtr(string(r.JobStatus.Phase)),
}
}

func (r *Reconciler) setWatchStatus() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
k8s.io/apimachinery v0.17.3
k8s.io/client-go v0.17.3
k8s.io/klog/v2 v2.0.0
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f
openebs.io/metac v0.3.0
)

Expand Down
2 changes: 2 additions & 0 deletions pkg/job/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (a *Assertable) runAssertByPath() {
Message: got.Message,
Verbose: got.Verbose,
Warning: got.Warning,
Timeout: got.Timeout,
}
}

Expand All @@ -125,6 +126,7 @@ func (a *Assertable) runAssertByState() {
Message: got.Message,
Verbose: got.Verbose,
Warning: got.Warning,
Timeout: got.Timeout,
}
}

Expand Down
271 changes: 271 additions & 0 deletions pkg/job/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
Copyright 2020 The MayaData Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package job

import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
types "mayadata.io/d-operators/types/job"
"openebs.io/metac/dynamic/clientset"
)

// CreatableConfig helps in creating new instance of Creatable
type CreatableConfig struct {
Fixture *Fixture
Retry *Retryable
Create *types.Create
TaskName string
}

// Creatable helps creating desired state(s) against the cluster
type Creatable struct {
*Fixture
Retry *Retryable
TaskName string
Create *types.Create

result *types.CreateResult
err error
}

func (c *Creatable) String() string {
if c.Create == nil {
return ""
}
return fmt.Sprintf(
"Create action: Resource %s %s: GVK %s: TaskName %s",
c.Create.State.GetNamespace(),
c.Create.State.GetName(),
c.Create.State.GroupVersionKind(),
c.TaskName,
)
}

// NewCreator returns a new instance of Creatable
func NewCreator(config CreatableConfig) *Creatable {
return &Creatable{
Create: config.Create,
Fixture: config.Fixture,
Retry: config.Retry,
TaskName: config.TaskName,
result: &types.CreateResult{},
}
}

func (c *Creatable) postCreateCRD(
crd *v1beta1.CustomResourceDefinition,
) error {
message := fmt.Sprintf(
"PostCreate CRD: Kind %s: APIVersion %s: TaskName %s",
crd.Spec.Names.Singular,
crd.Spec.Group+"/"+crd.Spec.Version,
c.TaskName,
)
// discover custom resource API
return c.Retry.Waitf(
func() (bool, error) {
got := c.apiDiscovery.
GetAPIForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
if got == nil {
return false, errors.Errorf(
"Failed to discover: Kind %s: APIVersion %s",
crd.Spec.Names.Singular,
crd.Spec.Group+"/"+crd.Spec.Version,
)
}
// fetch dynamic client for the custom resource
// corresponding to this CRD
customResourceClient, err := c.dynamicClientset.
GetClientForAPIVersionAndResource(
crd.Spec.Group+"/"+crd.Spec.Version,
crd.Spec.Names.Plural,
)
if err != nil {
return false, err
}
_, err = customResourceClient.List(metav1.ListOptions{})
if err != nil {
return false, err
}
return true, nil
},
message,
)
}

func (c *Creatable) createCRD() (*types.CreateResult, error) {
var crd *v1beta1.CustomResourceDefinition
err := UnstructToTyped(c.Create.State, &crd)
if err != nil {
return nil, err
}
// use crd client to create crd
crd, err = c.crdClient.
CustomResourceDefinitions().
Create(crd)
if err != nil {
return nil, errors.Wrapf(err, "%s", c)
}
// add to teardown functions
c.AddToTeardown(func() error {
_, err := c.crdClient.
CustomResourceDefinitions().
Get(
crd.GetName(),
metav1.GetOptions{},
)
if err != nil && apierrors.IsNotFound(err) {
// nothing to do
return nil
}
return c.crdClient.
CustomResourceDefinitions().
Delete(
crd.Name,
nil,
)
})
// run an additional step to wait till this CRD
// is discovered at apiserver
err = c.postCreateCRD(crd)
if err != nil {
return nil, err
}
return &types.CreateResult{
Phase: types.CreateStatusPassed,
Message: fmt.Sprintf(
"Create CRD: Kind %s: APIVersion %s",
crd.Spec.Names.Singular,
crd.Spec.Group+"/"+crd.Spec.Version,
),
}, nil
}

func (c *Creatable) createResource(
obj *unstructured.Unstructured,
client *clientset.ResourceClient,
) error {
_, err := client.
Namespace(obj.GetNamespace()).
Create(
obj,
metav1.CreateOptions{},
)
if err != nil {
return err
}
c.AddToTeardown(func() error {
_, err := client.
Namespace(obj.GetNamespace()).
Get(
obj.GetName(),
metav1.GetOptions{},
)
if err != nil && apierrors.IsNotFound(err) {
// nothing to do since resource is already deleted
return nil
}
return client.
Namespace(obj.GetNamespace()).
Delete(
obj.GetName(),
&metav1.DeleteOptions{},
)
})
return nil
}

func buildNamesFromGivenState(
obj *unstructured.Unstructured,
replicas int,
) ([]string, error) {
var name string
name = obj.GetGenerateName()
if name == "" {
name = obj.GetName()
}
if name == "" {
return nil, errors.Errorf(
"Failed to generate names: Either name or generateName required",
)
}
if replicas == 1 {
return []string{name}, nil
}
var out []string
for i := 0; i < replicas; i++ {
out = append(out, fmt.Sprintf("%s-%d", name, i))
}
return out, nil
}

func (c *Creatable) createResourceReplicas() (*types.CreateResult, error) {
replicas := 1
if c.Create.Replicas != nil {
replicas = *c.Create.Replicas
}
if replicas <= 0 {
return nil, errors.Errorf(
"Failed to create: Invalid replicas %d: %s",
replicas,
c,
)
}
client, err := c.dynamicClientset.
GetClientForAPIVersionAndKind(
c.Create.State.GetAPIVersion(),
c.Create.State.GetKind(),
)
if err != nil {
return nil, err
}
names, err := buildNamesFromGivenState(c.Create.State, replicas)
if err != nil {
return nil, errors.Wrapf(err, "%s", c)
}
for _, name := range names {
obj := &unstructured.Unstructured{
Object: c.Create.State.Object,
}
obj.SetName(name)
err = c.createResource(obj, client)
if err != nil {
return nil, errors.Wrapf(err, "%s", c)
}
}
return &types.CreateResult{
Phase: types.CreateStatusPassed,
Message: c.String(),
}, nil
}

// Run creates the desired state against the cluster
func (c *Creatable) Run() (*types.CreateResult, error) {
if c.Create.State.GetKind() == "CustomResourceDefinition" {
// create CRD
return c.createCRD()
}
return c.createResourceReplicas()
}
29 changes: 23 additions & 6 deletions pkg/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package job

import (
"time"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
types "mayadata.io/d-operators/types/job"
metacdiscovery "openebs.io/metac/dynamic/discovery"
metac "openebs.io/metac/start"
Expand Down Expand Up @@ -139,19 +142,20 @@ func (r *Runner) buildLockRunner() *LockRunner {
"name": r.Job.GetName() + "-lock",
"namespace": r.Job.GetNamespace(),
"labels": map[string]interface{}{
"jobs.metacontroller.app/lock": "true",
"job.dope.metacontroller.io/lock": "true",
},
},
},
},
},
}
return &LockRunner{
Fixture: r.fixture,
Task: lock,
LockForever: isLockForever,
Retry: NewRetry(RetryConfig{}),
ProtectedTaskCount: len(r.Job.Spec.Tasks),
Fixture: r.fixture,
Task: lock,
LockForever: isLockForever,
Retry: NewRetry(RetryConfig{}),
// no of tasks + elapsed time task
ProtectedTaskCount: len(r.Job.Spec.Tasks) + 1,
}
}

Expand Down Expand Up @@ -191,12 +195,22 @@ func (r *Runner) getAPIDiscovery() *metacdiscovery.APIResourceDiscovery {
// return apiDiscovery
}

func (r *Runner) addJobElapsedTimeInSeconds(elapsedtime float64) {
r.JobStatus.TaskListStatus["job-elapsed-time"] = types.TaskStatus{
Step: len(r.Job.Spec.Tasks) + 1,
Internal: pointer.BoolPtr(true),
Phase: types.TaskStatusPassed,
ElapsedTimeInSeconds: pointer.Float64Ptr(elapsedtime),
}
}

// runAll runs all the tasks
func (r *Runner) runAll() (status *types.JobStatus, err error) {
defer func() {
r.fixture.TearDown()
}()
var failedTasks int
var start = time.Now()
for idx, task := range r.Job.Spec.Tasks {
tr := &TaskRunner{
Fixture: r.fixture,
Expand All @@ -218,6 +232,9 @@ func (r *Runner) runAll() (status *types.JobStatus, err error) {
failedTasks++
}
}
// time taken for this job
elapsedSeconds := time.Since(start).Seconds()
r.addJobElapsedTimeInSeconds(elapsedSeconds)
// build the result
if failedTasks > 0 {
r.JobStatus.Phase = types.JobStatusFailed
Expand Down
Loading

0 comments on commit bd652f3

Please sign in to comment.