diff --git a/historyserver/Dockerfile b/historyserver/Dockerfile index ed999b6148a..e2a63bc4316 100644 --- a/historyserver/Dockerfile +++ b/historyserver/Dockerfile @@ -1,33 +1,25 @@ -ARG TARGETOS -ARG TARGETARCH - -FROM --platform=$BUILDPLATFORM golang:1.25.1 as builder -ENV GOPROXY=https://goproxy.cn,direct -ARG BUILD_RAYSERVER_DASHBOARD - -RUN if [ "$BUILD_RAYSERVER_DASHBOARD" = "yes" ] ; then \ - curl -o install.sh https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh && chmod +x install.sh && ./install.sh && /bin/bash -c "source $HOME/.nvm/nvm.sh && nvm install 14 && nvm use 14" ;\ -else \ - echo "$BUILD_RAYSERVER_DASHBOARD not yes, no need install nvm"; \ -fi +FROM --platform=$BUILDPLATFORM golang:1.25.1 AS builder +ENV GOPROXY=https://proxy.golang.org,direct WORKDIR /historyserver -COPY . . - -RUN if [ "$BUILD_RAYSERVER_DASHBOARD" = "yes" ] ; then \ - /bin/bash -c "source $HOME/.nvm/nvm.sh && cd dashboard/v2.51.0/client && npm ci && npm run build" ;\ -else \ - mkdir -p dashboard/v2.51.0/client/build ;\ - echo "do not npm run build"; \ -fi - -RUN make build GOOS=${TARGETOS} GOARCH=${TARGETARCH} +# Copy the go modules and manifests +COPY go.mod go.mod +COPY go.sum go.sum +# Cached dependencies to avoid re-downloading when only sources change +RUN go mod download + +# Copy the go source +COPY cmd/collector/main.go cmd/collector/main.go +COPY pkg/collector/ pkg/collector/ +COPY pkg/storage/ pkg/storage/ +COPY pkg/utils/ pkg/utils/ + +# Build the collector binary +COPY Makefile Makefile +RUN make build FROM ubuntu:22.04 - RUN apt-get update && apt-get upgrade -y && rm -rf /var/cache/apt/ && apt-get install -y ca-certificates - -COPY --from=builder /historyserver/output/bin/historyserver /usr/local/bin/historyserver COPY --from=builder /historyserver/output/bin/collector /usr/local/bin/collector -COPY --from=builder /historyserver/dashboard/v2.51.0/client/build /dashboard/v2.51.0/client/build -COPY --from=builder /historyserver/dashboard/homepage /dashboard/homepage + +ENTRYPOINT ["/usr/local/bin/collector"] diff --git a/historyserver/Makefile b/historyserver/Makefile index c36bda2b5d2..eccf88f2e3d 100644 --- a/historyserver/Makefile +++ b/historyserver/Makefile @@ -1,37 +1,22 @@ -# Image URL to use all building/pushing image targets +# Docker image reference for building and pushing +IMG ?= collector:v0.1.0 + GOLANGCILINT_VERSION ?= v1.59.0 GOBIN := $(shell go env GOPATH)/bin GOBIN_GOLANGCILINT := $(GOBIN)/golangci-lint -DOCKERBUILDER_INSTANCE=historyserver +DOCKERBUILDER_INSTANCE=collector OUT_DIR=output BIN_DIR=$(OUT_DIR)/bin -BINARY_NAME=historyserver -BINARY_NAME_COLLECTOR=collector +BIN_NAME=collector +GO_LDFLAGS := -extldflags "-static" +GO_BUILD_FLAGS := -ldflags '$(GO_LDFLAGS)' # Setting SHELL to bash allows bash commands to be executed by recipes. # Options are set to exit when a recipe line exits non-zero or a piped command fails. SHELL = /usr/bin/env bash -o pipefail -BUILD_TIMESTAMP = $(shell date -u +"%Y-%m-%dT%H:%M:%SZ") -COMMIT_SHORT ?= $(shell git rev-parse --short HEAD) -BRANCH ?= $(shell git branch --show-current) -VERSION ?= $(shell git describe --tags --long|awk -F '-' '{print $$1"."$$2"-"$$3""}') - -PACKAGE = gitlab.alibaba-inc.com/eml/historyserver - -GO_LDFLAGS := -extldflags "-static" -# GO_LDFLAGS += -w -s # Drop debugging symbols. -GO_LDFLAGS += -X $(PACKAGE)/pkg.Version=$(VERSION) \ - -X $(PACKAGE)/pkg.CommitID=$(COMMIT_SHORT) \ - -X $(PACKAGE)/pkg.BuildDate=$(BUILD_TIMESTAMP) \ - -X $(PACKAGE)/pkg.Branch=$(BRANCH) -GO_BUILD_FLAGS := -ldflags '$(GO_LDFLAGS)' - -GOOS ?= darwin -GOARCH ?= amd64 .PHONY: all - all: build .PHONY: clean @@ -39,27 +24,13 @@ clean: rm -rf $(OUT_DIR) .PHONY: build -build: buildcollector buildhistoryserver +build: buildcollector ## Build the collector binary. .PHONY: buildcollector -#build: mod alllint test buildcollector: mod @echo "" @echo "go build ..." - CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -v $(GO_BUILD_FLAGS) -o $(BIN_DIR)/$(BINARY_NAME_COLLECTOR) ./cmd/collector/main.go - -.PHONY: buildhistoryserver -#build: mod alllint test -buildhistoryserver: mod - @echo "" - @echo "go build ..." - CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -v $(GO_BUILD_FLAGS) -o $(BIN_DIR)/$(BINARY_NAME) ./cmd/historyserver/main.go - -.PHONY: simplebuild -simplebuild: - @echo "" - @echo "go build ..." - CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -v $(GO_BUILD_FLAGS) -o $(BIN_DIR)/$(BINARY_NAME) + go build -v $(GO_BUILD_FLAGS) -o $(BIN_DIR)/$(BIN_NAME) ./cmd/collector/main.go .PHONY: mod mod: @@ -67,16 +38,15 @@ mod: .PHONY: localimage localimage: dockerbuilder_instance - docker buildx build -t historyserver:laster --platform linux/amd64 . --load + docker buildx build -t $(IMG) --platform linux/arm64 . --load .PHONY: dockerbuilder_instance dockerbuilder_instance: @docker buildx use $(DOCKERBUILDER_INSTANCE) || docker buildx create --name $(DOCKERBUILDER_INSTANCE) docker buildx use $(DOCKERBUILDER_INSTANCE) -# Run tests .PHONY: test -test: +test: ## Run tests. go test -v ./pkg/... ./cmd/... .PHONY: alllint @@ -95,7 +65,7 @@ todolist: install-golint ## Run go lint against code. $(GOBIN_GOLANGCILINT) run --print-resources-usage -c .golangci.info.yaml --enable-only godox @echo "" -install-golint: ## check golint if not exist install golint tools +install-golint: ## Check golint if not exist install golint tools. ifneq ("$(wildcard $(GOBIN_GOLANGCILINT))","") ifeq ($(shell $(GOBIN_GOLANGCILINT) version --format short), $(GOLANGCILINT_VERSION)) @echo "golangci-lint version match" @@ -117,24 +87,3 @@ else echo 'Successfully installed' ;\ } endif - - - -# Generate manifests e.g. CRD, RBAC etc. -#manifests: controller-gen -# $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases - -# Generate code -#generate: controller-gen -# $(CONTROLLER_GEN) object:headerFile=./hack/boilerplate.go.txt paths=./api/... - - -# find or download controller-gen -# download controller-gen if necessary -#controller-gen: -#ifeq (, $(shell which controller-gen)) -# go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.2.0-beta.2 -#CONTROLLER_GEN=$(shell go env GOPATH)/bin/controller-gen -#else -#CONTROLLER_GEN=$(shell which controller-gen) -#endif diff --git a/historyserver/README.md b/historyserver/README.md index 1b520664d1c..d69f3be1499 100644 --- a/historyserver/README.md +++ b/historyserver/README.md @@ -1,145 +1,152 @@ -# KubeRay History Server +# KubeRay History Server - Log Collector -This project is under active development. -See [#ray-history-server](https://app.slack.com/client/TN4768NRM/C09QLLU8HTL) channel to provide feedback. +> [!NOTE] +> The KubeRay History Server is currently under active development. This document aims to provide the step-by-step +guideline to set up the Log Collector component for local development and testing. -Ray History Server is a service for collecting, storing, and viewing historical logs and metadata from Ray clusters. -It provides a web interface to explore the history of Ray jobs, tasks, actors, and other cluster activities. +## Table of Contents -## Components +TBD... -The History Server consists of two main components: +## Materials for Learning KubeRay History Server -1. **Collector**: Runs as a sidecar container in Ray clusters to collect logs and metadata -2. **History Server**: Central service that aggregates data from collectors and provides a web UI +* [REP: Ray History Server #62](https://github.com/ray-project/enhancements/pull/62) +* [Design doc](https://docs.google.com/document/d/15Y2bW4uzeUJe84FxRNUnHozoQPqYdLB2yLmgrdF2ZmI/edit?pli=1&tab=t.0#heading=h.xrvvvqarib6g) +* Related issues + * [\[Epic\]\[Feature\] history server collector #4274](https://github.com/ray-project/kuberay/issues/4274) + * [\[Epic\]\[Feature\] Support History Server #3966](https://github.com/ray-project/kuberay/issues/3966) + * [\[Feature\] Ray History Server #3884](https://github.com/ray-project/kuberay/issues/3884) +* Related PRs + * [Historyserver beta version #4187](https://github.com/ray-project/kuberay/pull/4187) + * [add the implementation of historyserver collector #4241](https://github.com/ray-project/kuberay/pull/4241) + * [add the implementation of historyserver #4242](https://github.com/ray-project/kuberay/pull/4242) -## Building +## Test the Log Collector on the Kind Cluster ### Prerequisites -- Go 1.19 or higher -- Docker (for building container images) -- Make +Please ensure your environment matches the version requirements specified in the [ray-operator development guide](https://github.com/ray-project/kuberay/blob/2959d7d8a4174eedbf7b4a71a79219547f62cc82/ray-operator/DEVELOPMENT.md): -### Building Binaries +* Go v1.24+ +* Docker: Engine for building the container image +* GNU Make +* K9s (optional) -To build the binaries locally: +### Spin Up a Kind Cluster and Run KubeRay Operator -```bash -make build -``` - -This will generate two binaries in the `output/bin/` directory: +The following environment setup is based on the [ray-operator development guide](https://github.com/JiangJiaWei1103/kuberay/blob/e4d8ad6e34adbe13b4c77c35313af2c9bc16da82/ray-operator/DEVELOPMENT.md#run-the-operator-inside-the-cluster). -- `collector`: The collector service that runs alongside Ray nodes -- `historyserver`: The main history server service +```bash +# Clone the KubeRay repo and cd into the working dir. +git clone https://github.com/ray-project/kuberay.git +cd kuberay/ray-operator -You can also build individual components: +# Spin up a kind cluster. +kind create cluster --image=kindest/node:v1.26.0 --name kuberay -```bash -make buildcollector # Build only the collector -make buildhistoryserver # Build only the history server -``` +# Build the KubeRay operator image. +IMG=kuberay/operator:latest make docker-build -### Building Docker Images +# Load the custom KubeRay image into the kind cluster. +kind load docker-image kuberay/operator:latest --name kuberay -To build a Docker image: +# Install the KubeRay operator with the custom image via local Helm chart. +helm install kuberay-operator \ + --set image.repository=kuberay/operator \ + --set image.tag=latest \ + ../helm-chart/kuberay-operator -```bash -make localimage +# Check the logs via kubectl or k9s. +kubectl logs deployments/kuberay-operator +# or +k9s ``` -This creates a Docker image named `historyserver:laster` with both binaries and necessary assets. +### Checkout the Latest Log Collector PR -For multi-platform builds, you can use: +We've made several changes to KunWu's original PR to make it more focused on the log collector component. Please run +the following commands to clone the correct repo and checkout the latest PR: ```bash -docker buildx build -t : --platform linux/amd64,linux/arm64 . --push -``` +# Clone KunWu's KubeRay repo. Also, rename the repo to avoid conflicts with the original KubeRay dir. +git clone https://github.com/KunWuLuan/kuberay.git kuberay_historyserver -## Configuration +# CD into the history server dir. +cd kuberay_historyserver/historyserver -### History Server Configuration +# Checkout the latest PR. +gh pr checkout 2 +``` -The history server can be configured using command-line flags: +### Build the Log Collector Container Image -- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss", "localtest") -- `--ray-root-dir`: Root directory for Ray logs -- `--kubeconfigs`: Path to kubeconfig file(s) for accessing Kubernetes clusters -- `--dashboard-dir`: Directory containing dashboard assets (default: "/dashboard") -- `--runtime-class-config-path`: Path to runtime class configuration file +```bash +# Build the log collector image. +make localimage -### Collector Configuration +# Check the built image. +docker images | grep collector +``` -The collector can be configured using command-line flags: +You're supposed to see a `collector:v0.1.0` image. If you'd like to change the image reference, please feel free to tag +it. -- `--role`: Node role ("Head" or "Worker") -- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss") -- `--ray-cluster-name`: Name of the Ray cluster -- `--ray-cluster-id`: ID of the Ray cluster -- `--ray-root-dir`: Root directory for Ray logs -- `--log-batching`: Number of log entries to batch before writing -- `--events-port`: Port for the events server -- `--push-interval`: Interval between pushes to storage -- `--runtime-class-config-path`: Path to runtime class configuration file +### Load the Log Collector Image into the Kind Cluster -## Supported Storage Backends +```bash +# Load the image into the kind cluster. +kind load docker-image collector:v0.1.0 --name kuberay -History Server supports multiple storage backends: +# Check the loaded image. +docker exec -it kuberay-control-plane crictl images | grep collector +``` -1. **S3/MinIO**: For AWS S3 or MinIO compatible storage -2. **Aliyun OSS**: For Alibaba Cloud Object Storage Service -3. **Local Test**: For local testing and development +### Deploy a Persistence Layer - MinIO -Each backend requires specific configuration parameters passed through environment variables or configuration files. +As S3 is used as the service provide, you need to deploy minio using the following commands: -## Running +```bash +# Apply the minio manifest. +kubectl apply -f config/minio.yaml -### Running the History Server +# Port-forward the minio UI for sanity check. +kubectl -n minio-dev port-forward svc/minio-service 9001:9001 --address 0.0.0.0 -```bash -./output/bin/historyserver \ - --runtime-class-name=s3 \ - --ray-root-dir=/path/to/logs +# Open the minio UI. +open http://localhost:9001/browser ``` -### Running the Collector +Then, you can login with: -```bash -./output/bin/collector \ - --role=Head \ - --runtime-class-name=s3 \ - --ray-cluster-name=my-cluster \ - --ray-root-dir=/path/to/logs +```text +Username: minioadmin +Password: minioadmin ``` -## Development +> [!IMPORTANT] +> Before deploying the Ray cluster, you also need to create a new bucket `ray-historyserver-log` in the minio UI for +uploaded logs: -### Code Structure +![create_bucket](https://github.com/KunWuLuan/kuberay/blob/0a70a0e354db13d005e90de9817e8a87308e4810/historyserver/assets/create_bucket.png) -- `cmd/`: Main applications (collector and historyserver) -- `backend/`: Core logic for storage backends and collection -- `backend/collector/`: Collector-specific code -- `backend/historyserver/`: History server implementation -- `dashboard/`: Web UI files +### Deploy a Ray Cluster for Checks -### Testing - -To run tests: +Finally, you can check if the log collector works as expected by deploying a Ray cluster with the collector enabled and +interacting with the minio UI. ```bash -make test +# Apply the Ray cluster manifest. +kubectl apply -f config/raycluster.yaml ``` -### Linting - -To run lint checks: +Since the session latest logs will be processed upon the Ray cluster is deleted, you can manully delete the Ray clsuter +to trigger log file uploading: ```bash -make alllint +# Trigger the session latest log processing upon deletion. +kubectl delete -f config/raycluster.yaml ``` -## Deployment +You're supposed to see the uploaded logs in the minio UI as below: -History Server can be deployed in Kubernetes using the manifests in the `config/samples/` directory. -Examples are provided for different storage backends including MinIO and Aliyun OSS. +![write_logs](https://github.com/KunWuLuan/kuberay/blob/0a70a0e354db13d005e90de9817e8a87308e4810/historyserver/assets/write_logs.png) diff --git a/historyserver/assets/create_bucket.png b/historyserver/assets/create_bucket.png new file mode 100644 index 00000000000..32c6f8a995a Binary files /dev/null and b/historyserver/assets/create_bucket.png differ diff --git a/historyserver/assets/write_logs.png b/historyserver/assets/write_logs.png new file mode 100644 index 00000000000..b27689d9a37 Binary files /dev/null and b/historyserver/assets/write_logs.png differ diff --git a/historyserver/cmd/collector/Dockerfile b/historyserver/cmd/collector/Dockerfile deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/historyserver/cmd/collector/main.go b/historyserver/cmd/collector/main.go index 0a1ed08e7d8..696753f5aa2 100644 --- a/historyserver/cmd/collector/main.go +++ b/historyserver/cmd/collector/main.go @@ -5,71 +5,68 @@ import ( "encoding/json" "flag" "os" - "path" "time" "github.com/sirupsen/logrus" "github.com/ray-project/kuberay/historyserver/pkg/collector" - "github.com/ray-project/kuberay/historyserver/pkg/collector/eventserver" "github.com/ray-project/kuberay/historyserver/pkg/collector/logcollector/runtime" "github.com/ray-project/kuberay/historyserver/pkg/collector/types" "github.com/ray-project/kuberay/historyserver/pkg/utils" ) -const runtimeClassConfigPath = "/var/collector-config/data" - func main() { - role := "" - runtimeClassName := "" - rayClusterName := "" - rayClusterId := "" - rayRootDir := "" - logBatching := 1000 - eventsPort := 8080 - pushInterval := time.Minute - runtimeClassConfigPath := "/var/collector-config/data" + var role string + var storageProvider string + var rayClusterName string + var rayClusterId string + var rayRootDir string + var logBatching int + var pushInterval time.Duration + var storageProviderConfigPath string flag.StringVar(&role, "role", "Worker", "") - flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") + flag.StringVar(&storageProvider, "storage-provider", "", "") flag.StringVar(&rayClusterName, "ray-cluster-name", "", "") flag.StringVar(&rayClusterId, "ray-cluster-id", "default", "") flag.StringVar(&rayRootDir, "ray-root-dir", "", "") flag.IntVar(&logBatching, "log-batching", 1000, "") - flag.IntVar(&eventsPort, "events-port", 8080, "") - flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" + flag.StringVar(&storageProviderConfigPath, "storage-provider-config-path", "", "") //"/var/collector-config/data" flag.DurationVar(&pushInterval, "push-interval", time.Minute, "") flag.Parse() sessionDir, err := utils.GetSessionDir() if err != nil { - panic("Failed to get session dir: " + err.Error()) + logrus.Errorf("Failed to get session dir: %v", err) + os.Exit(1) } rayNodeId, err := utils.GetRayNodeID() if err != nil { - panic("Failed to get ray node id: " + err.Error()) + logrus.Errorf("Failed to get ray node id: %v", err) + os.Exit(1) } - sessionName := path.Base(sessionDir) - jsonData := make(map[string]interface{}) - if runtimeClassConfigPath != "" { - data, err := os.ReadFile(runtimeClassConfigPath) + if storageProviderConfigPath != "" { + data, err := os.ReadFile(storageProviderConfigPath) if err != nil { - panic("Failed to read runtime class config " + err.Error()) + logrus.Errorf("Failed to read storage provider config: %v", err) + os.Exit(1) } err = json.Unmarshal(data, &jsonData) if err != nil { - panic("Failed to parse runtime class config: " + err.Error()) + logrus.Errorf("Failed to parse storage provider config: %v", err) + os.Exit(1) } } registry := collector.GetWriterRegistry() - factory, ok := registry[runtimeClassName] + factory, ok := registry[storageProvider] if !ok { - panic("Not supported runtime class name: " + runtimeClassName + " for role: " + role + ".") + logrus.Errorf("Not supported storage provider: %s for role: %s", storageProvider, role) + os.Exit(1) } globalConfig := types.RayCollectorConfig{ @@ -86,21 +83,15 @@ func main() { writer, err := factory(&globalConfig, jsonData) if err != nil { - panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".") + logrus.Errorf("Failed to create writer for storage provider: %s for role: %s: %v", storageProvider, role, err) + os.Exit(1) } - // Create and initialize EventServer - eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName) - eventServer.InitServer(eventsPort) - - collector := runtime.NewCollector(&globalConfig, writer) - _ = collector.Start(context.TODO().Done()) + // Create and initialize LogCollector + logCollector := runtime.NewCollector(&globalConfig, writer) + _ = logCollector.Start(context.TODO().Done()) - eventStop := eventServer.WaitForStop() - logStop := collector.WaitForStop() - <-eventStop - logrus.Info("Event server shutdown") + logStop := logCollector.WaitForStop() <-logStop logrus.Info("Log server shutdown") - logrus.Info("All servers shutdown") } diff --git a/historyserver/cmd/historyserver/Dockerfile b/historyserver/cmd/historyserver/Dockerfile deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/historyserver/cmd/historyserver/main.go b/historyserver/cmd/historyserver/main.go deleted file mode 100644 index 06ab7d0f9a3..00000000000 --- a/historyserver/cmd/historyserver/main.go +++ /dev/null @@ -1 +0,0 @@ -package main diff --git a/historyserver/config/minio.yaml b/historyserver/config/minio.yaml new file mode 100644 index 00000000000..89067d874b9 --- /dev/null +++ b/historyserver/config/minio.yaml @@ -0,0 +1,93 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: minio-dev +--- +# 1. Secrets for Login +apiVersion: v1 +kind: Secret +metadata: + name: minio-creds + namespace: minio-dev +type: Opaque +stringData: + rootUser: minioadmin # CHANGE THIS + rootPassword: minioadmin # CHANGE THIS +#--- +## 2. Storage (PVC) +#apiVersion: v1 +#kind: PersistentVolumeClaim +#metadata: +# name: minio-pvc +# namespace: minio-dev +#spec: +# accessModes: +# - ReadWriteOnce +# resources: +# requests: +# storage: 10Gi +--- +# 3. Deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio + namespace: minio-dev + labels: + app: minio +spec: + replicas: 1 + selector: + matchLabels: + app: minio + template: + metadata: + labels: + app: minio + spec: + containers: + - name: minio + image: quay.io/minio/minio:latest + command: + - /bin/sh + - -c + - minio server /data --console-address :9001 + env: + - name: MINIO_ROOT_USER + valueFrom: + secretKeyRef: + name: minio-creds + key: rootUser + - name: MINIO_ROOT_PASSWORD + valueFrom: + secretKeyRef: + name: minio-creds + key: rootPassword + ports: + - containerPort: 9000 # API Port + - containerPort: 9001 # Console UI Port +# volumeMounts: +# - name: data +# mountPath: /data +# volumes: +# - name: data +# persistentVolumeClaim: +# claimName: minio-pvc +--- +# 4. Service +apiVersion: v1 +kind: Service +metadata: + name: minio-service + namespace: minio-dev +spec: + selector: + app: minio + ports: + - name: api + port: 9000 + targetPort: 9000 + - name: console + port: 9001 + targetPort: 9001 + type: ClusterIP # Change to LoadBalancer if on Cloud/MetalLB diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml new file mode 100644 index 00000000000..21fdda2aa7c --- /dev/null +++ b/historyserver/config/raycluster.yaml @@ -0,0 +1,187 @@ +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + labels: + ray.io/cluster: raycluster-log-collector + name: raycluster-log-collector + namespace: default +spec: + headGroupSpec: + rayStartParams: + dashboard-host: 0.0.0.0 + num-cpus: "0" + serviceType: ClusterIP + template: + metadata: + labels: + test: raycluster-log-collector + spec: + imagePullSecrets: + affinity: + containers: + - name: ray-head + image: rayproject/ray:2.52.0 + imagePullPolicy: IfNotPresent + command: + - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' + securityContext: + allowPrivilegeEscalation: true + privileged: true + lifecycle: + postStart: + exec: + command: + - /bin/sh + - -lc + - -- + - | + GetNodeId(){ + while true; + do + nodeid=$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*') + if [ -n "$nodeid" ]; then + echo "$(date) raylet started: \"$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*')\" => ${nodeid}" >> /tmp/ray/init.log + echo $nodeid > /tmp/ray/raylet_node_id + break + else + echo "$(date) raylet not start >> /tmp/ray/init.log" + sleep 1 + fi + done + } + GetNodeId + resources: + limits: + cpu: "5" + memory: 10G + requests: + cpu: "50m" + memory: 1G + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + - name: collector + image: collector:v0.1.0 + imagePullPolicy: IfNotPresent + env: + - name: S3DISABLE_SSL + value: "true" + - name: AWS_S3ID + value: minioadmin + - name: AWS_S3SECRET + value: minioadmin + - name: AWS_S3TOKEN + value: "" + - name: S3_BUCKET + value: "ray-historyserver-log" + - name: S3_ENDPOINT + value: "minio-service.minio-dev:9000" + - name: S3_REGION + value: "test" + - name: S3FORCE_PATH_STYLE + value: "true" + args: + - --role=Head + - --storage-provider=s3 + - --ray-cluster-name=raycluster-log-collector + - --ray-root-dir=log + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + tolerations: + - key: ray + operator: Equal + value: cpu + volumes: + - name: historyserver + emptyDir: {} + workerGroupSpecs: + - groupName: cpu + maxReplicas: 1000 + minReplicas: 0 + numOfHosts: 1 + rayStartParams: {} + replicas: 0 + template: + metadata: + labels: + test: raycluster-log-collector + spec: + imagePullSecrets: + containers: + - name: ray-worker + image: rayproject/ray:2.52.0 + imagePullPolicy: IfNotPresent + command: + - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' + securityContext: + allowPrivilegeEscalation: true + privileged: true + lifecycle: + postStart: + exec: + command: + - /bin/sh + - -lc + - -- + - | + GetNodeId(){ + while true; + do + nodeid=$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*') + if [ -n "$nodeid" ]; then + echo "$(date) raylet started: \"$(ps -ef | grep raylet | grep node_id | grep -v grep | grep -oP '(?<=--node_id=)[^ ]*')\" => ${nodeid}" >> /tmp/ray/init.log + echo $nodeid > /tmp/ray/raylet_node_id + break + else + echo "$(date) raylet not start >> /tmp/ray/init.log" + sleep 1 + fi + done + } + GetNodeId + resources: + limits: + cpu: "30" + memory: 30G + requests: + cpu: "50m" + memory: 1G + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + - name: collector + image: collector:v0.1.0 + imagePullPolicy: IfNotPresent + env: + - name: S3DISABLE_SSL + value: "true" + - name: AWS_S3ID + value: minioadmin + - name: AWS_S3SECRET + value: minioadmin + - name: AWS_S3TOKEN + value: "" + - name: S3_BUCKET + value: "ray-historyserver-log" + - name: S3_ENDPOINT + value: "minio-service.minio-dev:9000" + - name: S3_REGION + value: "test" + - name: S3FORCE_PATH_STYLE + value: "true" + args: + - --role=Worker + - --storage-provider=s3 + - --ray-cluster-name=raycluster-log-collector + - --ray-root-dir=log + volumeMounts: + - name: historyserver + mountPath: /tmp/ray + tolerations: + - key: ray + operator: Equal + value: cpu + volumes: + - name: historyserver + emptyDir: {} diff --git a/historyserver/config/rayjob_using_existing_cluster.yaml b/historyserver/config/rayjob_using_existing_cluster.yaml new file mode 100644 index 00000000000..adb1ff2f80f --- /dev/null +++ b/historyserver/config/rayjob_using_existing_cluster.yaml @@ -0,0 +1,9 @@ +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-use-existing-raycluster +spec: + entrypoint: python -c "import ray; ray.init(); print(ray.cluster_resources())" + # Select the existing Ray cluster running the collector. + clusterSelector: + ray.io/cluster: raycluster-log-collector diff --git a/historyserver/config/sa.yaml b/historyserver/config/sa.yaml new file mode 100644 index 00000000000..9ec27152e2d --- /dev/null +++ b/historyserver/config/sa.yaml @@ -0,0 +1,31 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: historyserver +automountServiceAccountToken: true +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: raycluster-reader +rules: +- apiGroups: ["ray.io"] + resources: ["rayclusters"] + verbs: ["list", "get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: historyserver + namespace: default +subjects: +- kind: ServiceAccount + name: default + namespace: default +- kind: ServiceAccount + name: historyserver + namespace: default +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: raycluster-reader diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go deleted file mode 100644 index b0143c63c96..00000000000 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ /dev/null @@ -1,516 +0,0 @@ -package eventserver - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "os/signal" - "path" - "strings" - "sync" - "syscall" - "time" - - "github.com/emicklei/go-restful/v3" - "github.com/fsnotify/fsnotify" - "github.com/sirupsen/logrus" - - "github.com/ray-project/kuberay/historyserver/pkg/collector/logcollector/storage" -) - -type Event struct { - Data map[string]interface{} - Timestamp time.Time - - SessionName string - NodeID string -} - -type EventServer struct { - storageWriter storage.StorageWriter - stopped chan struct{} - clusterID string - sessionDir string - nodeID string - clusterName string - sessionName string - root string - currentSessionName string - currentNodeID string - events []Event - flushInterval time.Duration - mutex sync.Mutex -} - -func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, clusterName, clusterID, sessionName string) *EventServer { - server := &EventServer{ - events: make([]Event, 0), - storageWriter: writer, - root: rootDir, - sessionDir: sessionDir, - nodeID: nodeID, - clusterName: clusterName, - clusterID: clusterID, - sessionName: sessionName, - mutex: sync.Mutex{}, - flushInterval: time.Hour, // Default flush interval: 1 hour - stopped: make(chan struct{}), - currentSessionName: sessionName, // Initialize with configured sessionName - currentNodeID: nodeID, // Initialize with configured nodeID - } - - // Start goroutine to watch nodeID file changes - go server.watchNodeIDFile() - - return server -} - -func (es *EventServer) InitServer(port int) { - ws := new(restful.WebService) - ws.Path("/v1") - ws.Consumes(restful.MIME_JSON) - ws.Produces(restful.MIME_JSON) - - ws.Route(ws.POST("/events").To(es.PersistEvents)) - - restful.Add(ws) - - go func() { - logrus.Infof("Starting event server on port %d", port) - logrus.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - }() - go func() { - es.periodicFlush() - }() - - // Handle SIGTERM signal - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) - - go func() { - <-sigChan - logrus.Info("Received SIGTERM, flushing events to storage") - es.flushEvents() - close(es.stopped) - }() -} - -// watchNodeIDFile watches /tmp/ray/raylet_node_id for content changes -func (es *EventServer) watchNodeIDFile() { - nodeIDFilePath := "/tmp/ray/raylet_node_id" - - // Create new watcher - watcher, err := fsnotify.NewWatcher() - if err != nil { - logrus.Errorf("Failed to create file watcher: %v", err) - return - } - defer watcher.Close() - - // Add file to watch list - err = watcher.Add(nodeIDFilePath) - if err != nil { - logrus.Infof("Failed to add %s to watcher, will watch for file creation: %v", nodeIDFilePath, err) - // If file doesn't exist, watch parent directory - err = watcher.Add("/tmp/ray") - if err != nil { - logrus.Errorf("Failed to watch directory /tmp/ray: %v", err) - return - } - } - - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - - // Check if this is the target file - if event.Name == nodeIDFilePath && (event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create) { - // Read file content - content, err := os.ReadFile(nodeIDFilePath) - if err != nil { - logrus.Errorf("Failed to read node ID file %s: %v", nodeIDFilePath, err) - continue - } - - // Trim whitespace - newNodeID := strings.TrimSpace(string(content)) - if newNodeID == "" { - continue - } - - // Check if nodeID changed - es.mutex.Lock() - if es.currentNodeID != newNodeID { - oldNodeID := es.currentNodeID - logrus.Infof("Node ID changed from %s to %s, flushing events", oldNodeID, newNodeID) - - // Update current nodeID - es.currentNodeID = newNodeID - - // Collect events with same nodeID - var eventsToFlush []Event - var remainingEvents []Event - for _, event := range es.events { - if event.NodeID == oldNodeID { - eventsToFlush = append(eventsToFlush, event) - } else if event.NodeID == newNodeID { - remainingEvents = append(remainingEvents, event) - } else { - logrus.Errorf("Drop event with nodeId %v, event: %v", event.NodeID, event.Data) - } - } - - // Update event list, keep only events with different nodeID - es.events = remainingEvents - es.mutex.Unlock() - - // Flush events with same nodeID - if len(eventsToFlush) > 0 { - go es.flushEventsInternal(eventsToFlush) - } - continue - } - es.mutex.Unlock() - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - logrus.Errorf("File watcher error: %v", err) - case <-es.stopped: - logrus.Info("Event server stopped, exiting node ID watcher") - return - } - } -} - -func (es *EventServer) PersistEvents(req *restful.Request, resp *restful.Response) { - body, err := io.ReadAll(req.Request.Body) - if err != nil { - logrus.Errorf("Failed to read request body: %v", err) - resp.WriteError(http.StatusBadRequest, err) - return - } - - var eventDatas []map[string]interface{} - if err := json.Unmarshal(body, &eventDatas); err != nil { - logrus.Errorf("Failed to unmarshal event: %v", err) - resp.WriteError(http.StatusBadRequest, err) - return - } - - for _, eventData := range eventDatas { - // Parse timestamp - timestampStr, ok := eventData["timestamp"].(string) - if !ok { - logrus.Errorf("Event timestamp not found or not a string") - resp.WriteError(http.StatusBadRequest, fmt.Errorf("timestamp not found")) - return - } - - // Get sessionName from event - sessionNameStr, ok := eventData["sessionName"].(string) - if !ok { - logrus.Errorf("Event sessionName not found or not a string") - resp.WriteError(http.StatusBadRequest, fmt.Errorf("sessionName not found")) - return - } - - timestamp, err := time.Parse(time.RFC3339Nano, timestampStr) - if err != nil { - logrus.Errorf("Failed to parse timestamp: %v", err) - resp.WriteError(http.StatusBadRequest, err) - return - } - - es.mutex.Lock() - event := Event{ - Data: eventData, - Timestamp: timestamp, - SessionName: sessionNameStr, - NodeID: es.currentNodeID, // Store currentNodeID when event arrived - } - es.events = append(es.events, event) - - // Check if sessionName changed - if es.currentSessionName != sessionNameStr { - logrus.Infof("Session name changed from %s to %s, flushing events", es.currentSessionName, sessionNameStr) - // Save current events before flushing - eventsToFlush := make([]Event, len(es.events)) - copy(eventsToFlush, es.events) - - // Clear event list - es.events = es.events[:0] - - // Update current sessionName - es.currentSessionName = sessionNameStr - - // Unlock before flushing - es.mutex.Unlock() - - // Flush previous events - es.flushEventsInternal(eventsToFlush) - return - } - es.mutex.Unlock() - - logrus.Infof("Received event with ID: %v at %v, session: %s, node: %s", eventData["eventId"], timestamp, sessionNameStr, es.currentNodeID) - } - - resp.WriteHeader(http.StatusOK) -} - -func (es *EventServer) periodicFlush() { - ticker := time.NewTicker(es.flushInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - logrus.Info("Periodic flush triggered") - es.flushEvents() - case <-es.stopped: - logrus.Info("Event server stopped, exiting periodic flush") - return - } - } -} - -func (es *EventServer) WaitForStop() <-chan struct{} { - return es.stopped -} - -func (es *EventServer) flushEvents() { - es.mutex.Lock() - if len(es.events) == 0 { - es.mutex.Unlock() - logrus.Info("No events to flush") - return - } - - // Copy current event list - eventsToFlush := make([]Event, len(es.events)) - copy(eventsToFlush, es.events) - - // Clear event list - es.events = es.events[:0] - es.mutex.Unlock() - - // Execute flush operation - es.flushEventsInternal(eventsToFlush) -} - -// flushEventsInternal performs the actual event flush -func (es *EventServer) flushEventsInternal(eventsToFlush []Event) { - // Group events by hour and type - nodeEventsByHour := make(map[string][]Event) // Node-related events - jobEventsByHour := make(map[string][]Event) // Job-related events - - // Categorize events - for _, event := range eventsToFlush { - hourKey := event.Timestamp.Truncate(time.Hour).Format("2006-01-02-15") - - // Check event type - if es.isNodeEvent(event.Data) { - // Node-related events - nodeEventsByHour[hourKey] = append(nodeEventsByHour[hourKey], event) - } else if jobID := es.getJobID(event.Data); jobID != "" { - // Job-related events, use jobID-hour as key - jobKey := fmt.Sprintf("%s-%s", jobID, hourKey) - jobEventsByHour[jobKey] = append(jobEventsByHour[jobKey], event) - } else { - // Default to node events - nodeEventsByHour[hourKey] = append(nodeEventsByHour[hourKey], event) - } - } - - // Upload all events concurrently - var wg sync.WaitGroup - errChan := make(chan error, len(nodeEventsByHour)+len(jobEventsByHour)) - - // Upload node-related events - for hour, events := range nodeEventsByHour { - wg.Add(1) - go func(hourKey string, hourEvents []Event) { - defer wg.Done() - if err := es.flushNodeEventsForHour(hourKey, hourEvents); err != nil { - errChan <- err - } - }(hour, events) - } - - // Upload job-related events - for jobHour, events := range jobEventsByHour { - wg.Add(1) - go func(jobHourKey string, hourEvents []Event) { - defer wg.Done() - // Split jobID and hourKey - parts := strings.SplitN(jobHourKey, "-", 4) // Date format has 3 dashes, so use 4 parts - if len(parts) < 4 { - errChan <- fmt.Errorf("invalid job hour key: %s", jobHourKey) - return - } - - jobID := parts[0] - hourKey := strings.Join(parts[1:], "-") // Rejoin time parts as hourKey - - if err := es.flushJobEventsForHour(jobID, hourKey, hourEvents); err != nil { - errChan <- err - } - }(jobHour, events) - } - - wg.Wait() - close(errChan) - - // Check for errors - for err := range errChan { - logrus.Errorf("Error flushing events: %v", err) - } - - totalEvents := len(eventsToFlush) - logrus.Infof("Successfully flushed %d events to storage (%d node events, %d job events)", - totalEvents, - countEventsInMap(nodeEventsByHour), - countEventsInMap(jobEventsByHour)) -} - -// countEventsInMap counts total events in map -func countEventsInMap(eventsMap map[string][]Event) int { - count := 0 - for _, events := range eventsMap { - count += len(events) - } - return count -} - -var nodeEventType = []string{"NODE_LIFECYCLE_EVENT", "NODE_DEFINITION_EVENT"} - -// isNodeEvent checks if event is node-related -func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { - eventType, ok := eventData["eventType"].(string) - if !ok { - return false - } - for _, nodeEvent := range nodeEventType { - if eventType == nodeEvent { - return true - } - } - return false -} - -// getJobID gets jobID associated with event -func (es *EventServer) getJobID(eventData map[string]interface{}) string { - if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" { - return fmt.Sprintf("%v", jobID) - } - return "" -} - -// flushNodeEventsForHour flushes node events to storage -func (es *EventServer) flushNodeEventsForHour(hourKey string, events []Event) error { - // Create event data - eventsData := make([]map[string]interface{}, len(events)) - for i, event := range events { - eventsData[i] = event.Data - } - - data, err := json.Marshal(eventsData) - if err != nil { - return fmt.Errorf("failed to marshal node events: %w", err) - } - - reader := bytes.NewReader(data) - - // Use sessionName from event, not config - sessionNameToUse := es.sessionName // Default to configured sessionName - if len(events) > 0 && events[0].SessionName != "" { - sessionNameToUse = events[0].SessionName - } - - // Use NodeID from event, not current NodeID - nodeIDToUse := es.nodeID // Default to configured nodeID - if len(events) > 0 && events[0].NodeID != "" { - nodeIDToUse = events[0].NodeID - } - - // Build node event storage path using event's nodeID - basePath := path.Join( - es.root, - fmt.Sprintf("%s_%s", es.clusterName, es.clusterID), - sessionNameToUse, - "node_events", - fmt.Sprintf("%s-%s", nodeIDToUse, hourKey)) - - // Ensure storage directory exists - dir := path.Dir(basePath) - if err := es.storageWriter.CreateDirectory(dir); err != nil { - return fmt.Errorf("failed to create directory %s: %w", dir, err) - } - - // Write event file - if err := es.storageWriter.WriteFile(basePath, reader); err != nil { - return fmt.Errorf("failed to write node events file %s: %w", basePath, err) - } - - logrus.Infof("Successfully flushed %d node events for hour %s to %s, context: %s", len(events), hourKey, basePath, string(data)) - return nil -} - -// flushJobEventsForHour flushes job events to storage -func (es *EventServer) flushJobEventsForHour(jobID, hourKey string, events []Event) error { - // Create event data - eventsData := make([]map[string]interface{}, len(events)) - for i, event := range events { - eventsData[i] = event.Data - } - - data, err := json.Marshal(eventsData) - if err != nil { - return fmt.Errorf("failed to marshal job events: %w", err) - } - - reader := bytes.NewReader(data) - - // Use sessionName from event, not config - sessionNameToUse := es.sessionName // Default to configured sessionName - if len(events) > 0 && events[0].SessionName != "" { - sessionNameToUse = events[0].SessionName - } - - // Use NodeID from event, not current NodeID - nodeIDToUse := es.nodeID // Default to configured nodeID - if len(events) > 0 && events[0].NodeID != "" { - nodeIDToUse = events[0].NodeID - } - - // Build job event storage path using event's nodeID - basePath := path.Join( - es.root, - fmt.Sprintf("%s_%s", es.clusterName, es.clusterID), - sessionNameToUse, - "job_events", - jobID, - fmt.Sprintf("%s-%s", nodeIDToUse, hourKey)) - - // Ensure storage directory exists - dir := path.Dir(basePath) - if err := es.storageWriter.CreateDirectory(dir); err != nil { - return fmt.Errorf("failed to create directory %s: %w", dir, err) - } - - // Write event file - if err := es.storageWriter.WriteFile(basePath, reader); err != nil { - return fmt.Errorf("failed to write job events file %s: %w", basePath, err) - } - - logrus.Infof("Successfully flushed %d job events for job %s hour %s to %s", len(events), jobID, hourKey, basePath) - return nil -} diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index a2d4f09bfcb..8e505a363b4 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -679,9 +679,11 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { } // Walk through the logs directory and process all files + var processingErrors []error err := filepath.WalkDir(logsDir, func(path string, info fs.DirEntry, err error) error { if err != nil { logrus.Errorf("Error walking logs path %s: %v", path, err) + processingErrors = append(processingErrors, fmt.Errorf("error walking logs path %s: %w", path, err)) return nil } @@ -693,6 +695,7 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { // Process log file if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil { logrus.Errorf("Failed to process prev-log file %s: %v", path, err) + processingErrors = append(processingErrors, fmt.Errorf("failed to process prev-log file %s: %w", path, err)) } return nil @@ -702,6 +705,13 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { return } + // Process failed log files in the next retry + if len(processingErrors) > 0 { + logrus.Warnf("Failed to process %d log files for session: %s, node: %s. Keeping directory for retry. Errors: %v", + len(processingErrors), sessionID, nodeID, processingErrors) + return + } + // After successfully processing all files, remove the node directory logrus.Infof("Finished processing all logs for session: %s, node: %s. Removing node directory.", sessionID, nodeID) if err := os.RemoveAll(sessionNodeDir); err != nil { diff --git a/historyserver/pkg/collector/logcollector/runtime/runtime.go b/historyserver/pkg/collector/logcollector/runtime/runtime.go index 18e2bfc2788..c5af7691719 100644 --- a/historyserver/pkg/collector/logcollector/runtime/runtime.go +++ b/historyserver/pkg/collector/logcollector/runtime/runtime.go @@ -38,7 +38,7 @@ func NewCollector(config *types.RayCollectorConfig, writer storage.StorageWriter Writer: writer, ShutdownChan: make(chan struct{}), } - logDir := strings.TrimSpace(path.Join(config.SessionDir, utils.RAY_SESSIONDIR_LOGDIR_NAME)) + logDir := strings.TrimSpace(path.Join(config.SessionDir, utils.RaySessionDirLogDirName)) handler.LogDir = logDir rootMetaDir := fmt.Sprintf("%s/", path.Clean(path.Join(handler.RootDir, handler.RayClusterName+"_"+handler.RayClusterID, "meta"))) handler.MetaDir = rootMetaDir diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md b/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md index c4817b0dc40..88c537b592e 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md @@ -7,7 +7,7 @@ Oss endpoint and oss bucket are read from /var/collector-config/data. Content in /var/collector-config/data should be in json format, like `{"ossBucket": "", "ossEndpoint": ""}` -Set `--runtime-class-name=aliyunoss` to enable this module. +Set `--storage-provider=aliyunoss` to enable this module. Currently this module can only be used in ack environment. Oidc must be enabled for the cluster, and permission for write the oss must be granted. diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go index 0e0575db631..64fffbcaa08 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go @@ -36,7 +36,7 @@ type GlobalConfig struct { OSSHistoryServerDir string } -type RayMetaHanderConfig struct { +type RayMetaHandlerConfig struct { GlobalConfig RayClusterName string RayClusterID string diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go index 70c0236e994..8bc7208f417 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go @@ -39,8 +39,8 @@ func ValidateGlobalConfig(c *GlobalConfig, fldpath *field.Path) field.ErrorList return allErrs } -// ValidateMetaHanderConfig is -func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field.ErrorList { +// ValidateMetaHandlerConfig is +func ValidateMetaHandlerConfig(c *RayMetaHandlerConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.RayClusterName) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set")) @@ -51,7 +51,7 @@ func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field return allErrs } -func ValidatRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList { +func ValidateRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.DashBoardDir) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.DashBoardDir, "dashboard-dir must be set")) diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go index 01850fda6ac..6aa82915cbf 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go @@ -179,13 +179,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) { sessionInfo := strings.Split(metas[1], "_") date := sessionInfo[1] dataTime := sessionInfo[2] - createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) + creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) if err != nil { logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err) continue } - c.CreateTimeStamp = createTime.Unix() - c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z")) + c.CreationTimestamp = creationTime.Unix() + c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z")) clusters = append(clusters, *c) } if lsRes.IsTruncated { @@ -240,7 +240,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor return New(config) } -func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { +func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { config := &config{} config.complete(c, jd) @@ -269,7 +269,7 @@ func New(c *config) (*RayLogsHandler, error) { sessionDir := strings.TrimSpace(c.SessionDir) sessionDir = filepath.Clean(sessionDir) - logdir := strings.TrimSpace(path.Join(sessionDir, utils.RAY_SESSIONDIR_LOGDIR_NAME)) + logdir := strings.TrimSpace(path.Join(sessionDir, utils.RaySessionDirLogDirName)) logdir = filepath.Clean(logdir) logrus.Infof("Clean logdir is %s", logdir) diff --git a/historyserver/pkg/collector/logcollector/storage/localtest/reader.go b/historyserver/pkg/collector/logcollector/storage/localtest/reader.go index 3dbba172d1c..9259fa47013 100644 --- a/historyserver/pkg/collector/logcollector/storage/localtest/reader.go +++ b/historyserver/pkg/collector/logcollector/storage/localtest/reader.go @@ -15,34 +15,8 @@ type MockReader struct { clusters []utils.ClusterInfo } -// NewMockReader creates a new mock reader -func NewMockReader() *MockReader { - clusters := []utils.ClusterInfo{ - { - Name: "cluster-1", - SessionName: "session-1", - CreateTime: "2023-01-01T00:00:00Z", - CreateTimeStamp: 1672531200000, - }, - { - Name: "cluster-2", - SessionName: "session-2", - CreateTime: "2023-01-02T00:00:00Z", - CreateTimeStamp: 1672617600000, - }, - } - - data := map[string]map[string]string{ - "cluster-1": { - "log.txt": "This is log content for cluster-1\nMultiple lines\nof log content", - "metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"createTime\": \"2023-01-01T00:00:00Z\"\n}", - }, - "cluster-2": { - "log.txt": "This is log content for cluster-2\nMultiple lines\nof log content", - "metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"createTime\": \"2023-01-02T00:00:00Z\"\n}", - }, - } - +// NewMockReader creates a new mock reader with the provided clusters and data. +func NewMockReader(clusters []utils.ClusterInfo, data map[string]map[string]string) *MockReader { return &MockReader{ clusters: clusters, data: data, @@ -75,7 +49,33 @@ func (r *MockReader) ListFiles(clusterId string, dir string) []string { return []string{} } -// NewReader creates a new StorageReader +// NewReader creates a new StorageReader with some default mock data. func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) { - return NewMockReader(), nil + clusters := []utils.ClusterInfo{ + { + Name: "cluster-1", + SessionName: "session-1", + CreationTime: "2023-01-01T00:00:00Z", + CreationTimestamp: 1672531200000, + }, + { + Name: "cluster-2", + SessionName: "session-2", + CreationTime: "2023-01-02T00:00:00Z", + CreationTimestamp: 1672617600000, + }, + } + + data := map[string]map[string]string{ + "cluster-1": { + "log.txt": "This is log content for cluster-1\nMultiple lines\nof log content", + "metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"creationTime\": \"2023-01-01T00:00:00Z\"\n}", + }, + "cluster-2": { + "log.txt": "This is log content for cluster-2\nMultiple lines\nof log content", + "metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"creationTime\": \"2023-01-02T00:00:00Z\"\n}", + }, + } + + return NewMockReader(clusters, data), nil } diff --git a/historyserver/pkg/collector/logcollector/storage/s3/README.md b/historyserver/pkg/collector/logcollector/storage/s3/README.md index b5a548c2a78..6388118c920 100644 --- a/historyserver/pkg/collector/logcollector/storage/s3/README.md +++ b/historyserver/pkg/collector/logcollector/storage/s3/README.md @@ -7,6 +7,6 @@ S3 endpoint, S3 region and S3 bucket are read from /var/collector-config/data. Content in /var/collector-config/data should be in json format, like `{"s3Bucket": "", "s3Endpoint": "", "s3Region": ""}` -Set `--runtime-class-name=s3` to enable this module. +Set `--storage-provider=s3` to enable this module. This module can be used with any S3 compatible storage service. diff --git a/historyserver/pkg/collector/logcollector/storage/s3/config.go b/historyserver/pkg/collector/logcollector/storage/s3/config.go index 404941f1120..4dc9d33dd75 100644 --- a/historyserver/pkg/collector/logcollector/storage/s3/config.go +++ b/historyserver/pkg/collector/logcollector/storage/s3/config.go @@ -32,8 +32,8 @@ func (c *config) complete(rcc *types.RayCollectorConfig, jd map[string]interface if os.Getenv("S3FORCE_PATH_STYLE") != "" { c.S3ForcePathStyle = aws.Bool(os.Getenv("S3FORCE_PATH_STYLE") == "true") } - if os.Getenv("s3DisableSSL") != "" { - c.DisableSSL = aws.Bool(os.Getenv("s3DisableSSL") == "true") + if os.Getenv("S3DISABLE_SSL") != "" { + c.DisableSSL = aws.Bool(os.Getenv("S3DISABLE_SSL") == "true") } } else { if bucket, ok := jd["s3Bucket"]; ok { @@ -48,7 +48,7 @@ func (c *config) complete(rcc *types.RayCollectorConfig, jd map[string]interface if forcePathStyle, ok := jd["s3ForcePathStyle"]; ok { c.S3ForcePathStyle = aws.Bool(forcePathStyle.(string) == "true") } - if s3disableSSL, ok := jd["s3DisableSSLs3DisableSSL"]; ok { + if s3disableSSL, ok := jd["s3DisableSSL"]; ok { c.DisableSSL = aws.Bool(s3disableSSL.(string) == "true") } } @@ -68,8 +68,8 @@ func (c *config) completeHSConfig(rcc *types.RayHistoryServerConfig, jd map[stri if os.Getenv("S3FORCE_PATH_STYLE") != "" { c.S3ForcePathStyle = aws.Bool(os.Getenv("S3FORCE_PATH_STYLE") == "true") } - if os.Getenv("s3DisableSSL") != "" { - c.DisableSSL = aws.Bool(os.Getenv("s3DisableSSL") == "true") + if os.Getenv("S3DISABLE_SSL") != "" { + c.DisableSSL = aws.Bool(os.Getenv("S3DISABLE_SSL") == "true") } } else { if bucket, ok := jd["s3Bucket"]; ok { diff --git a/historyserver/pkg/collector/logcollector/storage/s3/s3.go b/historyserver/pkg/collector/logcollector/storage/s3/s3.go index 1ddba7767b5..32423adb60c 100644 --- a/historyserver/pkg/collector/logcollector/storage/s3/s3.go +++ b/historyserver/pkg/collector/logcollector/storage/s3/s3.go @@ -28,6 +28,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" @@ -60,21 +61,35 @@ func (r *RayLogsHandler) CreateDirectory(d string) error { Bucket: aws.String(r.S3Bucket), Key: aws.String(objectDir), }) - if err != nil { - // Directory doesn't exist, create it - logrus.Infof("Begin to create s3 dir %s ...", objectDir) - _, err = r.S3Client.PutObject(&s3.PutObjectInput{ - Bucket: aws.String(r.S3Bucket), - Key: aws.String(objectDir), - Body: bytes.NewReader([]byte("")), - }) - if err != nil { - logrus.Errorf("Failed to create directory '%s': %v", objectDir, err) - return err + + // Directory placeholder already exists + if err == nil { + return nil + } + + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeNoSuchKey, "NotFound": + logrus.Infof("S3 object %s doesn't exist, creating it...", objectDir) + + _, err = r.S3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(r.S3Bucket), + Key: aws.String(objectDir), + Body: bytes.NewReader([]byte("")), + }) + if err != nil { + return fmt.Errorf("failed to create s3 object %s: %w", objectDir, err) + } + logrus.Infof("Created s3 object %s", objectDir) + return nil + default: + return fmt.Errorf("failed to retrieve metadata from s3 object %s: %w", objectDir, err) } - logrus.Infof("Create s3 dir %s success", objectDir) } - return nil + + // If err is not an AWS-specific error (e.g., network error, context cancelled), + // we can't determine if the object exists, so return the error immediately. + return fmt.Errorf("failed to check if s3 object %s exists: %w", objectDir, err) } func (r *RayLogsHandler) WriteFile(file string, reader io.ReadSeeker) error { @@ -179,13 +194,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) { sessionInfo := strings.Split(metas[1], "_") date := sessionInfo[1] dataTime := sessionInfo[2] - createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) + creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) if err != nil { logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err) continue } - c.CreateTimeStamp = createTime.Unix() - c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z")) + c.CreationTimestamp = creationTime.Unix() + c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z")) clusters = append(clusters, *c) } return true @@ -250,7 +265,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor return New(config) } -func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { +func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { config := &config{} config.complete(c, jd) @@ -282,7 +297,7 @@ func New(c *config) (*RayLogsHandler, error) { sessionDir := strings.TrimSpace(c.SessionDir) sessionDir = filepath.Clean(sessionDir) - logdir := strings.TrimSpace(path.Join(sessionDir, utils.RAY_SESSIONDIR_LOGDIR_NAME)) + logdir := strings.TrimSpace(path.Join(sessionDir, utils.RaySessionDirLogDirName)) logdir = filepath.Clean(logdir) logrus.Infof("Clean logdir is %s", logdir) diff --git a/historyserver/pkg/collector/registry.go b/historyserver/pkg/collector/registry.go index 4bbfc6e2fa3..03bbdc6183b 100644 --- a/historyserver/pkg/collector/registry.go +++ b/historyserver/pkg/collector/registry.go @@ -15,8 +15,8 @@ func GetWriterRegistry() WriterRegistry { } var writerRegistry = WriterRegistry{ - "aliyunoss": ray.NewWritter, - "s3": s3.NewWritter, + "aliyunoss": ray.NewWriter, + "s3": s3.NewWriter, } type ReaderRegistry map[string]func(globalData *types.RayHistoryServerConfig, data map[string]interface{}) (storage.StorageReader, error) diff --git a/historyserver/pkg/collector/types/types.go b/historyserver/pkg/collector/types/types.go index c8457fec70e..d9bf8225e5d 100644 --- a/historyserver/pkg/collector/types/types.go +++ b/historyserver/pkg/collector/types/types.go @@ -22,8 +22,8 @@ type RayCollectorConfig struct { PushInterval time.Duration } -// ValidateRayHanderConfig is -func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList { +// ValidateRayHandlerConfig is +func ValidateRayHandlerConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.SessionDir) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.SessionDir, "session-dir must be set")) @@ -38,13 +38,5 @@ func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.E allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) } - if c.Role == "Head" { - if len(c.RayClusterName) == 0 { - allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set")) - } - if len(c.RayClusterID) == 0 { - allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) - } - } return allErrs } diff --git a/historyserver/pkg/eventserver/events.go b/historyserver/pkg/eventserver/events.go deleted file mode 100644 index cc93cb622e0..00000000000 --- a/historyserver/pkg/eventserver/events.go +++ /dev/null @@ -1 +0,0 @@ -package eventserver diff --git a/historyserver/pkg/utils/constant.go b/historyserver/pkg/utils/constant.go new file mode 100644 index 00000000000..b01f3d10d19 --- /dev/null +++ b/historyserver/pkg/utils/constant.go @@ -0,0 +1,48 @@ +package utils + +import "time" + +// Ray session directory related constants. +const ( + RaySessionDirLogDirName = "logs" + RaySessionDirMetaDirName = "meta" +) + +// Local Ray runtime paths. +const ( + RaySessionLatestPath = "/tmp/ray/session_latest" + RayNodeIDPath = "/tmp/ray/raylet_node_id" +) + +// OSS meta file keys used by history server. +const ( + OssMetaFileBasicInfo = "ack__basicinfo" + + OssMetaFileNodeSummaryKey = "restful__nodes_view_summary" + OssMetaFileNodePrefix = "restful__nodes_" + OssMetaFileJobTaskDetailPrefix = "restful__api__v0__tasks_detail_job_id_" + OssMetaFileJobTaskSummarizeByFuncNamePrefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_" + OssMetaFileJobTaskSummarizeByLineagePrefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_" + OssMetaFileJobDatasetsPrefix = "restful__api__data__datasets_job_id_" + OssMetaFileNodeLogsPrefix = "restful__api__v0__logs_node_id_" + OssMetaFileClusterStatus = "restful__api__cluster_status" + OssMetaFileLogicalActors = "restful__logical__actors" + OssMetaFileAllTasksDetail = "restful__api__v0__tasks_detail" + OssMetaFileEvents = "restful__events" + OssMetaFilePlacementGroups = "restful__api__v0__placement_groups_detail" + OssMetaFileClusterSessionName = "static__api__cluster_session_name" + OssMetaFileJobs = "restful__api__jobs" + OssMetaFileApplications = "restful__api__serve__applications" +) + +// Ray history server log file name. +const RayHistoryServerLogName = "historyserver-ray.log" + +const ( + // DefaultMaxRetryAttempts controls how many times we retry reading + // local Ray metadata files (e.g. session dir, node id) before failing. + DefaultMaxRetryAttempts = 3 + // DefaultInitialRetryDelay is the base delay before the first retry. + // Subsequent retries use an exponential backoff based on this value. + DefaultInitialRetryDelay = 5 * time.Second +) diff --git a/historyserver/pkg/utils/types.go b/historyserver/pkg/utils/types.go index 683e00c3bde..90f5e247700 100644 --- a/historyserver/pkg/utils/types.go +++ b/historyserver/pkg/utils/types.go @@ -17,15 +17,15 @@ limitations under the License. package utils type ClusterInfo struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - SessionName string `json:"sessionName"` - CreateTime string `json:"createTime"` - CreateTimeStamp int64 `json:"createTimeStamp"` + Name string `json:"name"` + Namespace string `json:"namespace"` + SessionName string `json:"sessionName"` + CreationTime string `json:"creationTime"` + CreationTimestamp int64 `json:"creationTimestamp"` } type ClusterInfoList []ClusterInfo func (a ClusterInfoList) Len() int { return len(a) } func (a ClusterInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // Sort descending +func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreationTimestamp > a[j].CreationTimestamp } // Sort descending diff --git a/historyserver/pkg/utils/utils.go b/historyserver/pkg/utils/utils.go index 53040cd6008..404761ec1cb 100644 --- a/historyserver/pkg/utils/utils.go +++ b/historyserver/pkg/utils/utils.go @@ -28,35 +28,6 @@ import ( "github.com/sirupsen/logrus" ) -const ( - RAY_SESSIONDIR_LOGDIR_NAME = "logs" - RAY_SESSIONDIR_METADIR_NAME = "meta" -) - -const ( - OssMetaFile_BasicInfo = "ack__basicinfo" - - OssMetaFile_NodeSummaryKey = "restful__nodes_view_summary" - OssMetaFile_Node_Prefix = "restful__nodes_" - OssMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_" - OssMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_" - OssMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_" - OssMetaFile_JOBDATASETS_Prefix = "restful__api__data__datasets_job_id_" - OssMetaFile_NodeLogs_Prefix = "restful__api__v0__logs_node_id_" - OssMetaFile_ClusterStatus = "restful__api__cluster_status" - OssMetaFile_LOGICAL_ACTORS = "restful__logical__actors" - OssMetaFile_ALLTASKS_DETAIL = "restful__api__v0__tasks_detail" - OssMetaFile_Events = "restful__events" - OssMetaFile_PlacementGroups = "restful__api__v0__placement_groups_detail" - - OssMetaFile_ClusterSessionName = "static__api__cluster_session_name" - - OssMetaFile_Jobs = "restful__api__jobs" - OssMetaFile_Applications = "restful__api__serve__applications" -) - -const RAY_HISTORY_SERVER_LOGNAME = "historyserver-ray.log" - func RecreateObjectDir(bucket *oss.Bucket, dir string, options ...oss.Option) error { objectDir := fmt.Sprintf("%s/", path.Clean(dir)) @@ -182,15 +153,15 @@ func DeleteObject(bucket *oss.Bucket, objectName string) error { } func GetMetaDirByNameID(ossHistorySeverDir, rayClusterNameID string) string { - return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, rayClusterNameID, RAY_SESSIONDIR_METADIR_NAME))) + return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, rayClusterNameID, RaySessionDirMetaDirName))) } func GetLogDirByNameID(ossHistorySeverDir, rayClusterNameID, rayNodeID, sessionId string) string { - return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, rayClusterNameID, sessionId, RAY_SESSIONDIR_LOGDIR_NAME, rayNodeID))) + return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, rayClusterNameID, sessionId, RaySessionDirLogDirName, rayNodeID))) } func GetLogDir(ossHistorySeverDir, rayClusterName, rayClusterID, sessionId, rayNodeID string) string { - return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, AppendRayClusterNameID(rayClusterName, rayClusterID), sessionId, RAY_SESSIONDIR_LOGDIR_NAME, rayNodeID))) + return fmt.Sprintf("%s/", path.Clean(path.Join(ossHistorySeverDir, AppendRayClusterNameID(rayClusterName, rayClusterID), sessionId, RaySessionDirLogDirName, rayNodeID))) } const ( @@ -202,7 +173,7 @@ func AppendRayClusterNameID(rayClusterName, rayClusterID string) string { return fmt.Sprintf("%s%s%s", rayClusterName, connector, rayClusterID) } -func GetRarClusterNameAndID(rayClusterNameID string) (string, string) { +func GetRayClusterNameAndID(rayClusterNameID string) (string, string) { nameID := strings.Split(rayClusterNameID, connector) if len(nameID) < 2 { logrus.Fatalf("rayClusterNameID %s must match name%sid pattern", rayClusterNameID, connector) @@ -211,13 +182,17 @@ func GetRarClusterNameAndID(rayClusterNameID string) (string, string) { } func GetSessionDir() (string, error) { - session_latest_path := "/tmp/ray/session_latest" - for i := 0; i < 12; i++ { - rp, err := os.Readlink(session_latest_path) + for i := 0; i < DefaultMaxRetryAttempts; i++ { + rp, err := os.Readlink(RaySessionLatestPath) if err != nil { logrus.Errorf("read session_latest file error %v", err) - time.Sleep(time.Second * 5) - continue + if i < DefaultMaxRetryAttempts-1 { + // Apply exponential backoff between retries. We use bit shift to compute 2 to the power of i. + backoff := time.Duration(1<