diff --git a/.buildkite/test-e2e.yml b/.buildkite/test-e2e.yml index 128e564a351..0a199275f95 100644 --- a/.buildkite/test-e2e.yml +++ b/.buildkite/test-e2e.yml @@ -160,3 +160,23 @@ - echo "KUBERAY_TEST_OUTPUT_DIR=$$KUBERAY_TEST_OUTPUT_DIR" - KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 30m -v ./test/e2erayjobsubmitter 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-log.tar -T - && exit 1) - echo "--- END:RayJob Light Weight Submitter E2E (nightly operator) tests finished" + +- label: 'Test RayCronJob E2E (nightly operator)' + instance_size: large + image: golang:1.25-bookworm + commands: + - source .buildkite/setup-env.sh + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml + - kubectl config set clusters.kind-kind.server https://docker:6443 + # Build nightly KubeRay operator image + - pushd ray-operator + - source ../.buildkite/build-start-operator.sh + - kubectl wait --timeout=90s --for=condition=Available=true deployment kuberay-operator + # Run e2e tests and print KubeRay operator logs if tests fail + - echo "--- START:Running e2e (nightly operator) RayCronJob tests" + - if [ -n "$${KUBERAY_TEST_RAY_IMAGE}" ]; then echo "Using Ray Image $${KUBERAY_TEST_RAY_IMAGE}"; fi + - set -o pipefail + - mkdir -p "$(pwd)/tmp" && export KUBERAY_TEST_OUTPUT_DIR=$(pwd)/tmp + - echo "KUBERAY_TEST_OUTPUT_DIR=$$KUBERAY_TEST_OUTPUT_DIR" + - KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 40m -v ./test/e2eraycronjob 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-raycronjob-log.tar -T - && exit 1) + - echo "--- END:RayCronJob E2E (nightly operator) tests finished" diff --git a/.buildkite/values-kuberay-operator-override.yaml b/.buildkite/values-kuberay-operator-override.yaml index 41fdc9f1a31..7b2e1c84557 100644 --- a/.buildkite/values-kuberay-operator-override.yaml +++ b/.buildkite/values-kuberay-operator-override.yaml @@ -18,3 +18,5 @@ featureGates: enabled: true - name: RayMultiHostIndexing enabled: true + - name: RayCronJob + enabled: true diff --git a/helm-chart/kuberay-operator/crds/ray.io_raycronjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_raycronjobs.yaml index 23cdb163f79..c2a3f252a89 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_raycronjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_raycronjobs.yaml @@ -8,6 +8,8 @@ metadata: spec: group: ray.io names: + categories: + - all kind: RayCronJob listKind: RayCronJobList plural: raycronjobs diff --git a/ray-operator/apis/ray/v1/raycronjob_types.go b/ray-operator/apis/ray/v1/raycronjob_types.go index 9ae7fb127e7..57b4af0a90d 100644 --- a/ray-operator/apis/ray/v1/raycronjob_types.go +++ b/ray-operator/apis/ray/v1/raycronjob_types.go @@ -31,6 +31,10 @@ type RayCronJobStatus struct { //+kubebuilder:printcolumn:name="age",type="date",JSONPath=".metadata.creationTimestamp",priority=0 //+kubebuilder:printcolumn:name="suspend",type=boolean,JSONPath=".spec.suspend",priority=0 +// +genclient +// +kubebuilder:resource:categories=all +// +kubebuilder:storageversion +// //nolint:govet // RayCronJob is the Schema for the raycronjobs API type RayCronJob struct { metav1.TypeMeta `json:",inline"` diff --git a/ray-operator/config/crd/bases/ray.io_raycronjobs.yaml b/ray-operator/config/crd/bases/ray.io_raycronjobs.yaml index 23cdb163f79..c2a3f252a89 100644 --- a/ray-operator/config/crd/bases/ray.io_raycronjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_raycronjobs.yaml @@ -8,6 +8,8 @@ metadata: spec: group: ray.io names: + categories: + - all kind: RayCronJob listKind: RayCronJobList plural: raycronjobs diff --git a/ray-operator/config/overlays/test-overrides/deployment-override.yaml b/ray-operator/config/overlays/test-overrides/deployment-override.yaml index 7f2a4e2403c..0480af92eed 100644 --- a/ray-operator/config/overlays/test-overrides/deployment-override.yaml +++ b/ray-operator/config/overlays/test-overrides/deployment-override.yaml @@ -9,4 +9,4 @@ spec: containers: - name: kuberay-operator args: - - --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true + - --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true,RayCronJob=true diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjob.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjob.go new file mode 100644 index 00000000000..6d435a03316 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjob.go @@ -0,0 +1,227 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// RayCronJobApplyConfiguration represents a declarative configuration of the RayCronJob type for use +// with apply. +type RayCronJobApplyConfiguration struct { + metav1.TypeMetaApplyConfiguration `json:",inline"` + *metav1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + Spec *RayCronJobSpecApplyConfiguration `json:"spec,omitempty"` + Status *RayCronJobStatusApplyConfiguration `json:"status,omitempty"` +} + +// RayCronJob constructs a declarative configuration of the RayCronJob type for use with +// apply. +func RayCronJob(name, namespace string) *RayCronJobApplyConfiguration { + b := &RayCronJobApplyConfiguration{} + b.WithName(name) + b.WithNamespace(namespace) + b.WithKind("RayCronJob") + b.WithAPIVersion("ray.io/v1") + return b +} + +func (b RayCronJobApplyConfiguration) IsApplyConfiguration() {} + +// WithKind sets the Kind field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Kind field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithKind(value string) *RayCronJobApplyConfiguration { + b.TypeMetaApplyConfiguration.Kind = &value + return b +} + +// WithAPIVersion sets the APIVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the APIVersion field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithAPIVersion(value string) *RayCronJobApplyConfiguration { + b.TypeMetaApplyConfiguration.APIVersion = &value + return b +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithName(value string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithGenerateName(value string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithNamespace(value string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithUID(value types.UID) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithResourceVersion(value string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithGeneration(value int64) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithCreationTimestamp(value apismetav1.Time) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithDeletionTimestamp(value apismetav1.Time) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ObjectMetaApplyConfiguration.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *RayCronJobApplyConfiguration) WithLabels(entries map[string]string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.ObjectMetaApplyConfiguration.Labels == nil && len(entries) > 0 { + b.ObjectMetaApplyConfiguration.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.ObjectMetaApplyConfiguration.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *RayCronJobApplyConfiguration) WithAnnotations(entries map[string]string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.ObjectMetaApplyConfiguration.Annotations == nil && len(entries) > 0 { + b.ObjectMetaApplyConfiguration.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.ObjectMetaApplyConfiguration.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *RayCronJobApplyConfiguration) WithOwnerReferences(values ...*metav1.OwnerReferenceApplyConfiguration) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.ObjectMetaApplyConfiguration.OwnerReferences = append(b.ObjectMetaApplyConfiguration.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *RayCronJobApplyConfiguration) WithFinalizers(values ...string) *RayCronJobApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.ObjectMetaApplyConfiguration.Finalizers = append(b.ObjectMetaApplyConfiguration.Finalizers, values[i]) + } + return b +} + +func (b *RayCronJobApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &metav1.ObjectMetaApplyConfiguration{} + } +} + +// WithSpec sets the Spec field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Spec field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithSpec(value *RayCronJobSpecApplyConfiguration) *RayCronJobApplyConfiguration { + b.Spec = value + return b +} + +// WithStatus sets the Status field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Status field is set to the value of the last call. +func (b *RayCronJobApplyConfiguration) WithStatus(value *RayCronJobStatusApplyConfiguration) *RayCronJobApplyConfiguration { + b.Status = value + return b +} + +// GetKind retrieves the value of the Kind field in the declarative configuration. +func (b *RayCronJobApplyConfiguration) GetKind() *string { + return b.TypeMetaApplyConfiguration.Kind +} + +// GetAPIVersion retrieves the value of the APIVersion field in the declarative configuration. +func (b *RayCronJobApplyConfiguration) GetAPIVersion() *string { + return b.TypeMetaApplyConfiguration.APIVersion +} + +// GetName retrieves the value of the Name field in the declarative configuration. +func (b *RayCronJobApplyConfiguration) GetName() *string { + b.ensureObjectMetaApplyConfigurationExists() + return b.ObjectMetaApplyConfiguration.Name +} + +// GetNamespace retrieves the value of the Namespace field in the declarative configuration. +func (b *RayCronJobApplyConfiguration) GetNamespace() *string { + b.ensureObjectMetaApplyConfigurationExists() + return b.ObjectMetaApplyConfiguration.Namespace +} diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobspec.go new file mode 100644 index 00000000000..2f64c2c90de --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobspec.go @@ -0,0 +1,48 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +// RayCronJobSpecApplyConfiguration represents a declarative configuration of the RayCronJobSpec type for use +// with apply. +// +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type RayCronJobSpecApplyConfiguration struct { + // JobTemplate defines the job spec that will be created by cron scheduling + JobTemplate *RayJobSpecApplyConfiguration `json:"jobTemplate,omitempty"` + // Schedule is the cron schedule string + Schedule *string `json:"schedule,omitempty"` + // Suspend tells the controller to suspend the scheduling, it does not apply to + // scheduled RayJob. + Suspend *bool `json:"suspend,omitempty"` +} + +// RayCronJobSpecApplyConfiguration constructs a declarative configuration of the RayCronJobSpec type for use with +// apply. +func RayCronJobSpec() *RayCronJobSpecApplyConfiguration { + return &RayCronJobSpecApplyConfiguration{} +} + +// WithJobTemplate sets the JobTemplate field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the JobTemplate field is set to the value of the last call. +func (b *RayCronJobSpecApplyConfiguration) WithJobTemplate(value *RayJobSpecApplyConfiguration) *RayCronJobSpecApplyConfiguration { + b.JobTemplate = value + return b +} + +// WithSchedule sets the Schedule field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Schedule field is set to the value of the last call. +func (b *RayCronJobSpecApplyConfiguration) WithSchedule(value string) *RayCronJobSpecApplyConfiguration { + b.Schedule = &value + return b +} + +// WithSuspend sets the Suspend field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Suspend field is set to the value of the last call. +func (b *RayCronJobSpecApplyConfiguration) WithSuspend(value bool) *RayCronJobSpecApplyConfiguration { + b.Suspend = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobstatus.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobstatus.go new file mode 100644 index 00000000000..c0f735928a9 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/raycronjobstatus.go @@ -0,0 +1,29 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// RayCronJobStatusApplyConfiguration represents a declarative configuration of the RayCronJobStatus type for use +// with apply. +// +// RayCronJobStatus defines the observed state of RayCronJob +type RayCronJobStatusApplyConfiguration struct { + LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"` +} + +// RayCronJobStatusApplyConfiguration constructs a declarative configuration of the RayCronJobStatus type for use with +// apply. +func RayCronJobStatus() *RayCronJobStatusApplyConfiguration { + return &RayCronJobStatusApplyConfiguration{} +} + +// WithLastScheduleTime sets the LastScheduleTime field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the LastScheduleTime field is set to the value of the last call. +func (b *RayCronJobStatusApplyConfiguration) WithLastScheduleTime(value metav1.Time) *RayCronJobStatusApplyConfiguration { + b.LastScheduleTime = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/utils.go b/ray-operator/pkg/client/applyconfiguration/utils.go index 16a92835cf5..b36186baaa8 100644 --- a/ray-operator/pkg/client/applyconfiguration/utils.go +++ b/ray-operator/pkg/client/applyconfiguration/utils.go @@ -46,6 +46,12 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &rayv1.RayClusterStatusApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayClusterUpgradeStrategy"): return &rayv1.RayClusterUpgradeStrategyApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayCronJob"): + return &rayv1.RayCronJobApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayCronJobSpec"): + return &rayv1.RayCronJobSpecApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayCronJobStatus"): + return &rayv1.RayCronJobStatusApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJob"): return &rayv1.RayJobApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJobSpec"): diff --git a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_ray_client.go b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_ray_client.go index 0c23a176006..fa392b99764 100644 --- a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_ray_client.go +++ b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_ray_client.go @@ -16,6 +16,10 @@ func (c *FakeRayV1) RayClusters(namespace string) v1.RayClusterInterface { return newFakeRayClusters(c, namespace) } +func (c *FakeRayV1) RayCronJobs(namespace string) v1.RayCronJobInterface { + return newFakeRayCronJobs(c, namespace) +} + func (c *FakeRayV1) RayJobs(namespace string) v1.RayJobInterface { return newFakeRayJobs(c, namespace) } diff --git a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_raycronjob.go b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_raycronjob.go new file mode 100644 index 00000000000..d3f6886b948 --- /dev/null +++ b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/fake/fake_raycronjob.go @@ -0,0 +1,33 @@ +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + rayv1 "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + typedrayv1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1" + gentype "k8s.io/client-go/gentype" +) + +// fakeRayCronJobs implements RayCronJobInterface +type fakeRayCronJobs struct { + *gentype.FakeClientWithListAndApply[*v1.RayCronJob, *v1.RayCronJobList, *rayv1.RayCronJobApplyConfiguration] + Fake *FakeRayV1 +} + +func newFakeRayCronJobs(fake *FakeRayV1, namespace string) typedrayv1.RayCronJobInterface { + return &fakeRayCronJobs{ + gentype.NewFakeClientWithListAndApply[*v1.RayCronJob, *v1.RayCronJobList, *rayv1.RayCronJobApplyConfiguration]( + fake.Fake, + namespace, + v1.SchemeGroupVersion.WithResource("raycronjobs"), + v1.SchemeGroupVersion.WithKind("RayCronJob"), + func() *v1.RayCronJob { return &v1.RayCronJob{} }, + func() *v1.RayCronJobList { return &v1.RayCronJobList{} }, + func(dst, src *v1.RayCronJobList) { dst.ListMeta = src.ListMeta }, + func(list *v1.RayCronJobList) []*v1.RayCronJob { return gentype.ToPointerSlice(list.Items) }, + func(list *v1.RayCronJobList, items []*v1.RayCronJob) { list.Items = gentype.FromPointerSlice(items) }, + ), + fake, + } +} diff --git a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/generated_expansion.go b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/generated_expansion.go index d0ff0fa819e..79e577f4da3 100644 --- a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/generated_expansion.go +++ b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/generated_expansion.go @@ -4,6 +4,8 @@ package v1 type RayClusterExpansion interface{} +type RayCronJobExpansion interface{} + type RayJobExpansion interface{} type RayServiceExpansion interface{} diff --git a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/ray_client.go b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/ray_client.go index d433dedc9b9..1619a57ba01 100644 --- a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/ray_client.go +++ b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/ray_client.go @@ -13,6 +13,7 @@ import ( type RayV1Interface interface { RESTClient() rest.Interface RayClustersGetter + RayCronJobsGetter RayJobsGetter RayServicesGetter } @@ -26,6 +27,10 @@ func (c *RayV1Client) RayClusters(namespace string) RayClusterInterface { return newRayClusters(c, namespace) } +func (c *RayV1Client) RayCronJobs(namespace string) RayCronJobInterface { + return newRayCronJobs(c, namespace) +} + func (c *RayV1Client) RayJobs(namespace string) RayJobInterface { return newRayJobs(c, namespace) } diff --git a/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/raycronjob.go b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/raycronjob.go new file mode 100644 index 00000000000..765240582b0 --- /dev/null +++ b/ray-operator/pkg/client/clientset/versioned/typed/ray/v1/raycronjob.go @@ -0,0 +1,58 @@ +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + context "context" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + applyconfigurationrayv1 "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + scheme "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + gentype "k8s.io/client-go/gentype" +) + +// RayCronJobsGetter has a method to return a RayCronJobInterface. +// A group's client should implement this interface. +type RayCronJobsGetter interface { + RayCronJobs(namespace string) RayCronJobInterface +} + +// RayCronJobInterface has methods to work with RayCronJob resources. +type RayCronJobInterface interface { + Create(ctx context.Context, rayCronJob *rayv1.RayCronJob, opts metav1.CreateOptions) (*rayv1.RayCronJob, error) + Update(ctx context.Context, rayCronJob *rayv1.RayCronJob, opts metav1.UpdateOptions) (*rayv1.RayCronJob, error) + // Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + UpdateStatus(ctx context.Context, rayCronJob *rayv1.RayCronJob, opts metav1.UpdateOptions) (*rayv1.RayCronJob, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (*rayv1.RayCronJob, error) + List(ctx context.Context, opts metav1.ListOptions) (*rayv1.RayCronJobList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *rayv1.RayCronJob, err error) + Apply(ctx context.Context, rayCronJob *applyconfigurationrayv1.RayCronJobApplyConfiguration, opts metav1.ApplyOptions) (result *rayv1.RayCronJob, err error) + // Add a +genclient:noStatus comment above the type to avoid generating ApplyStatus(). + ApplyStatus(ctx context.Context, rayCronJob *applyconfigurationrayv1.RayCronJobApplyConfiguration, opts metav1.ApplyOptions) (result *rayv1.RayCronJob, err error) + RayCronJobExpansion +} + +// rayCronJobs implements RayCronJobInterface +type rayCronJobs struct { + *gentype.ClientWithListAndApply[*rayv1.RayCronJob, *rayv1.RayCronJobList, *applyconfigurationrayv1.RayCronJobApplyConfiguration] +} + +// newRayCronJobs returns a RayCronJobs +func newRayCronJobs(c *RayV1Client, namespace string) *rayCronJobs { + return &rayCronJobs{ + gentype.NewClientWithListAndApply[*rayv1.RayCronJob, *rayv1.RayCronJobList, *applyconfigurationrayv1.RayCronJobApplyConfiguration]( + "raycronjobs", + c.RESTClient(), + scheme.ParameterCodec, + namespace, + func() *rayv1.RayCronJob { return &rayv1.RayCronJob{} }, + func() *rayv1.RayCronJobList { return &rayv1.RayCronJobList{} }, + ), + } +} diff --git a/ray-operator/pkg/client/informers/externalversions/generic.go b/ray-operator/pkg/client/informers/externalversions/generic.go index 9d05856976e..e309377325e 100644 --- a/ray-operator/pkg/client/informers/externalversions/generic.go +++ b/ray-operator/pkg/client/informers/externalversions/generic.go @@ -39,6 +39,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=ray.io, Version=v1 case v1.SchemeGroupVersion.WithResource("rayclusters"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ray().V1().RayClusters().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("raycronjobs"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Ray().V1().RayCronJobs().Informer()}, nil case v1.SchemeGroupVersion.WithResource("rayjobs"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ray().V1().RayJobs().Informer()}, nil case v1.SchemeGroupVersion.WithResource("rayservices"): diff --git a/ray-operator/pkg/client/informers/externalversions/ray/v1/interface.go b/ray-operator/pkg/client/informers/externalversions/ray/v1/interface.go index 40e4d8b11c6..cca518657eb 100644 --- a/ray-operator/pkg/client/informers/externalversions/ray/v1/interface.go +++ b/ray-operator/pkg/client/informers/externalversions/ray/v1/interface.go @@ -10,6 +10,8 @@ import ( type Interface interface { // RayClusters returns a RayClusterInformer. RayClusters() RayClusterInformer + // RayCronJobs returns a RayCronJobInformer. + RayCronJobs() RayCronJobInformer // RayJobs returns a RayJobInformer. RayJobs() RayJobInformer // RayServices returns a RayServiceInformer. @@ -32,6 +34,11 @@ func (v *version) RayClusters() RayClusterInformer { return &rayClusterInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// RayCronJobs returns a RayCronJobInformer. +func (v *version) RayCronJobs() RayCronJobInformer { + return &rayCronJobInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // RayJobs returns a RayJobInformer. func (v *version) RayJobs() RayJobInformer { return &rayJobInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/ray-operator/pkg/client/informers/externalversions/ray/v1/raycronjob.go b/ray-operator/pkg/client/informers/externalversions/ray/v1/raycronjob.go new file mode 100644 index 00000000000..329952e5724 --- /dev/null +++ b/ray-operator/pkg/client/informers/externalversions/ray/v1/raycronjob.go @@ -0,0 +1,86 @@ +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + context "context" + time "time" + + apisrayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + versioned "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned" + internalinterfaces "github.com/ray-project/kuberay/ray-operator/pkg/client/informers/externalversions/internalinterfaces" + rayv1 "github.com/ray-project/kuberay/ray-operator/pkg/client/listers/ray/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// RayCronJobInformer provides access to a shared informer and lister for +// RayCronJobs. +type RayCronJobInformer interface { + Informer() cache.SharedIndexInformer + Lister() rayv1.RayCronJobLister +} + +type rayCronJobInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewRayCronJobInformer constructs a new informer for RayCronJob type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewRayCronJobInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredRayCronJobInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredRayCronJobInformer constructs a new informer for RayCronJob type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredRayCronJobInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.RayV1().RayCronJobs(namespace).List(context.Background(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.RayV1().RayCronJobs(namespace).Watch(context.Background(), options) + }, + ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.RayV1().RayCronJobs(namespace).List(ctx, options) + }, + WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.RayV1().RayCronJobs(namespace).Watch(ctx, options) + }, + }, client), + &apisrayv1.RayCronJob{}, + resyncPeriod, + indexers, + ) +} + +func (f *rayCronJobInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredRayCronJobInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *rayCronJobInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&apisrayv1.RayCronJob{}, f.defaultInformer) +} + +func (f *rayCronJobInformer) Lister() rayv1.RayCronJobLister { + return rayv1.NewRayCronJobLister(f.Informer().GetIndexer()) +} diff --git a/ray-operator/pkg/client/listers/ray/v1/expansion_generated.go b/ray-operator/pkg/client/listers/ray/v1/expansion_generated.go index e45cb53d299..b209e8d0d71 100644 --- a/ray-operator/pkg/client/listers/ray/v1/expansion_generated.go +++ b/ray-operator/pkg/client/listers/ray/v1/expansion_generated.go @@ -10,6 +10,14 @@ type RayClusterListerExpansion interface{} // RayClusterNamespaceLister. type RayClusterNamespaceListerExpansion interface{} +// RayCronJobListerExpansion allows custom methods to be added to +// RayCronJobLister. +type RayCronJobListerExpansion interface{} + +// RayCronJobNamespaceListerExpansion allows custom methods to be added to +// RayCronJobNamespaceLister. +type RayCronJobNamespaceListerExpansion interface{} + // RayJobListerExpansion allows custom methods to be added to // RayJobLister. type RayJobListerExpansion interface{} diff --git a/ray-operator/pkg/client/listers/ray/v1/raycronjob.go b/ray-operator/pkg/client/listers/ray/v1/raycronjob.go new file mode 100644 index 00000000000..7f298aea645 --- /dev/null +++ b/ray-operator/pkg/client/listers/ray/v1/raycronjob.go @@ -0,0 +1,54 @@ +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + labels "k8s.io/apimachinery/pkg/labels" + listers "k8s.io/client-go/listers" + cache "k8s.io/client-go/tools/cache" +) + +// RayCronJobLister helps list RayCronJobs. +// All objects returned here must be treated as read-only. +type RayCronJobLister interface { + // List lists all RayCronJobs in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*rayv1.RayCronJob, err error) + // RayCronJobs returns an object that can list and get RayCronJobs. + RayCronJobs(namespace string) RayCronJobNamespaceLister + RayCronJobListerExpansion +} + +// rayCronJobLister implements the RayCronJobLister interface. +type rayCronJobLister struct { + listers.ResourceIndexer[*rayv1.RayCronJob] +} + +// NewRayCronJobLister returns a new RayCronJobLister. +func NewRayCronJobLister(indexer cache.Indexer) RayCronJobLister { + return &rayCronJobLister{listers.New[*rayv1.RayCronJob](indexer, rayv1.Resource("raycronjob"))} +} + +// RayCronJobs returns an object that can list and get RayCronJobs. +func (s *rayCronJobLister) RayCronJobs(namespace string) RayCronJobNamespaceLister { + return rayCronJobNamespaceLister{listers.NewNamespaced[*rayv1.RayCronJob](s.ResourceIndexer, namespace)} +} + +// RayCronJobNamespaceLister helps list and get RayCronJobs. +// All objects returned here must be treated as read-only. +type RayCronJobNamespaceLister interface { + // List lists all RayCronJobs in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*rayv1.RayCronJob, err error) + // Get retrieves the RayCronJob from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*rayv1.RayCronJob, error) + RayCronJobNamespaceListerExpansion +} + +// rayCronJobNamespaceLister implements the RayCronJobNamespaceLister +// interface. +type rayCronJobNamespaceLister struct { + listers.ResourceIndexer[*rayv1.RayCronJob] +} diff --git a/ray-operator/test/e2eraycronjob/raycronjob_suspend_test.go b/ray-operator/test/e2eraycronjob/raycronjob_suspend_test.go new file mode 100644 index 00000000000..1c7f231e21e --- /dev/null +++ b/ray-operator/test/e2eraycronjob/raycronjob_suspend_test.go @@ -0,0 +1,95 @@ +package e2eraycronjob + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + rayclientset "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned" + . "github.com/ray-project/kuberay/ray-operator/test/support" +) + +func rayCronJobACTemplate(name, namespace, schedule string) *rayv1ac.RayCronJobApplyConfiguration { + return rayv1ac.RayCronJob(name, namespace). + WithSpec( + rayv1ac.RayCronJobSpec(). + WithSchedule(schedule). + WithJobTemplate( + rayv1ac.RayJobSpec(). + WithEntrypoint("sleep 1"). + WithRayClusterSpec(NewRayClusterSpec()), + ), + ) +} + +func TestRayCronJobSuspend(t *testing.T) { + test := With(t) + g := NewWithT(t) + + // Create a namespace + namespace := test.NewTestNamespace() + test.T().Run("RayCronJob respects suspend flag across schedule and resumes job creation.", func(_ *testing.T) { + rayCronJobAC := rayCronJobACTemplate("suspended-raycronjob", namespace.Name, "*/1 * * * *") + rayCronJobAC.Spec.WithSuspend(true) + rayCronJob, err := test.Client().Ray().RayV1().RayCronJobs(namespace.Name).Apply(test.Ctx(), rayCronJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCronJob %s/%s successfully", rayCronJob.Namespace, rayCronJob.Name) + + rayCronJob, err = GetRayCronJob(test, rayCronJob.Namespace, rayCronJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + ownerUID := rayCronJob.UID + + // No RayJob should be created + LogWithTimestamp(test.T(), "Waiting to ensure no RayJobs are created while suspended") + g.Consistently(func() (int, error) { + return countRayJobsOwnedByUID(test.Ctx(), test.Client().Ray(), namespace.Name, ownerUID) + }, 130*time.Second, 5*time.Second).Should(BeZero()) + + // Resume + LogWithTimestamp(test.T(), "Resuming RayCronJob %s/%s", rayCronJob.Namespace, rayCronJob.Name) + patch := []byte(`{"spec":{"suspend":false}}`) + _, err = test.Client().Ray().RayV1().RayCronJobs(namespace.Name).Patch(test.Ctx(), rayCronJob.Name, types.MergePatchType, patch, metav1.PatchOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + // Spec.Suspend should be false + g.Eventually(RayCronJob(test, namespace.Name, rayCronJob.Name), TestTimeoutShort). + Should(WithTransform(func(rayCronJob *rayv1.RayCronJob) bool { + return !rayCronJob.Spec.Suspend + }, BeTrue())) + + // Jobs must start appearing now + g.Eventually(func() (int, error) { + return countRayJobsOwnedByUID(test.Ctx(), test.Client().Ray(), namespace.Name, ownerUID) + }, TestTimeoutMedium).Should(BeNumerically(">", 0)) + + // Delete the RayCronJob + err = test.Client().Ray().RayV1().RayCronJobs(namespace.Name).Delete(test.Ctx(), rayCronJob.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Deleted RayCronJob %s/%s successfully", rayCronJob.Namespace, rayCronJob.Name) + }) +} + +// countRayJobsOwnedByUID counts RayJobs in namespace whose ownerReferences contains ownerUID. +func countRayJobsOwnedByUID(ctx context.Context, rayClient rayclientset.Interface, namespace string, ownerUID types.UID) (int, error) { + list, err := rayClient.RayV1().RayJobs(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return 0, err + } + n := 0 + for i := range list.Items { + owners := list.Items[i].OwnerReferences + for _, or := range owners { + if or.UID == ownerUID { + n++ + break + } + } + } + return n, nil +} diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index 717ffe2fc70..2917b3f585b 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -17,6 +17,16 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) +func RayCronJob(t Test, namespace, name string) func() (*rayv1.RayCronJob, error) { + return func() (*rayv1.RayCronJob, error) { + return GetRayCronJob(t, namespace, name) + } +} + +func GetRayCronJob(t Test, namespace, name string) (*rayv1.RayCronJob, error) { + return t.Client().Ray().RayV1().RayCronJobs(namespace).Get(t.Ctx(), name, metav1.GetOptions{}) +} + func RayJob(t Test, namespace, name string) func() (*rayv1.RayJob, error) { return func() (*rayv1.RayJob, error) { return GetRayJob(t, namespace, name)