Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayClus
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func() utils.RayHttpProxyClientInterface {
func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy)
}
7 changes: 2 additions & 5 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type RayServiceReconciler struct {
ServeConfigs *lru.Cache
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
httpProxyClientFunc func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface
}

// NewRayServiceReconciler returns a new reconcile.Reconciler
Expand Down Expand Up @@ -983,13 +983,10 @@ func (r *RayServiceReconciler) updateHeadPodServeLabel(ctx context.Context, rayS
return fmt.Errorf("found 0 head. cluster name %s, namespace %v", rayClusterInstance.Name, rayClusterInstance.Namespace)
}

client := r.httpProxyClientFunc()
client.InitClient()

rayContainer := headPod.Spec.Containers[utils.RayContainerIndex]
servingPort := utils.FindContainerPort(&rayContainer, utils.ServingPortName, utils.DefaultServingPort)
client.SetHostIp(headPod.Status.PodIP, headPod.Namespace, headPod.Name, servingPort)

client := r.httpProxyClientFunc(headPod.Status.PodIP, headPod.Namespace, headPod.Name, servingPort)
if headPod.Labels == nil {
headPod.Labels = make(map[string]string)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func TestLabelHeadPodForServeStatus(t *testing.T) {
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: newScheme,
httpProxyClientFunc: func() utils.RayHttpProxyClientInterface {
httpProxyClientFunc: func(_, _, _ string, _ int) utils.RayHttpProxyClientInterface {
return fakeRayHttpProxyClient
},
}
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) fun
}
}

func (testProvider TestClientProvider) GetHttpProxyClient(_ manager.Manager) func() utils.RayHttpProxyClientInterface {
return func() utils.RayHttpProxyClientInterface {
func (testProvider TestClientProvider) GetHttpProxyClient(_ manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
return func(_, _, _ string, _ int) utils.RayHttpProxyClientInterface {
return fakeRayHttpProxyClient
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ type FakeRayHttpProxyClient struct {
IsHealthy bool
}

func (fc *FakeRayHttpProxyClient) InitClient() {}

func (fc *FakeRayHttpProxyClient) SetHostIp(_, _, _ string, _ int) {}

func (fc *FakeRayHttpProxyClient) CheckProxyActorHealth(_ context.Context) error {
if !fc.IsHealthy {
return fmt.Errorf("fake proxy actor is not healthy")
Expand Down
28 changes: 2 additions & 26 deletions ray-operator/controllers/ray/utils/httpproxy_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,15 @@ import (
"io"
"net/http"
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

type RayHttpProxyClientInterface interface {
InitClient()
CheckProxyActorHealth(ctx context.Context) error
SetHostIp(hostIp, podNamespace, podName string, port int)
}

func GetRayHttpProxyClientFunc(mgr ctrl.Manager, useKubernetesProxy bool) func() RayHttpProxyClientInterface {
return func() RayHttpProxyClientInterface {
return &RayHttpProxyClient{
mgr: mgr,
useKubernetesProxy: useKubernetesProxy,
}
}
}

type RayHttpProxyClient struct {
client *http.Client
mgr ctrl.Manager
httpProxyURL string
useKubernetesProxy bool
client *http.Client
httpProxyURL string
}

func (r *RayHttpProxyClient) InitClient() {
Expand All @@ -38,15 +23,6 @@ func (r *RayHttpProxyClient) InitClient() {
}
}

func (r *RayHttpProxyClient) SetHostIp(hostIp, podNamespace, podName string, port int) {
if r.useKubernetesProxy {
r.client = r.mgr.GetHTTPClient()
r.httpProxyURL = fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s:%d/proxy/", r.mgr.GetConfig().Host, podNamespace, podName, port)
}

r.httpProxyURL = fmt.Sprintf("http://%s:%d/", hostIp, port)
}

// CheckProxyActorHealth checks the health status of the Ray Serve proxy actor.
func (r *RayHttpProxyClient) CheckProxyActorHealth(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.httpProxyURL+RayServeProxyHealthPath, nil)
Expand Down
17 changes: 16 additions & 1 deletion ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)

type ClientProvider interface {
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error)
GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface
GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface
}

func ManagedByExternalController(controllerName *string) *string {
Expand Down Expand Up @@ -785,3 +785,18 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
}, nil
}
}

func GetRayHttpProxyClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface {
return func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface {
if useKubernetesProxy {
return &RayHttpProxyClient{
client: mgr.GetHTTPClient(),
httpProxyURL: fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s:%d/proxy/", mgr.GetConfig().Host, podNamespace, podName, port),
}
}
return &RayHttpProxyClient{
client: &http.Client{Timeout: 2 * time.Second},
httpProxyURL: fmt.Sprintf("http://%s:%d/", hostIp, port),
}
}
}
Loading