diff --git a/historyserver/Makefile b/historyserver/Makefile index dc92d2583a6..8ca5ae8d5f9 100644 --- a/historyserver/Makefile +++ b/historyserver/Makefile @@ -72,7 +72,7 @@ localimage-collector: .PHONY: localimage localimage: dockerbuilder_instance - docker buildx build -t historyserver:laster --platform linux/amd64 . --load + docker buildx build -t historyserver:latest --platform linux/amd64 . --load .PHONY: dockerbuilder_instance dockerbuilder_instance: diff --git a/historyserver/README.md b/historyserver/README.md index 1b520664d1c..7641289d5c5 100644 --- a/historyserver/README.md +++ b/historyserver/README.md @@ -63,25 +63,25 @@ docker buildx build -t : --platform linux/amd64,linux/arm64 . - The history server can be configured using command-line flags: -- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss", "localtest") +- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest") - `--ray-root-dir`: Root directory for Ray logs - `--kubeconfigs`: Path to kubeconfig file(s) for accessing Kubernetes clusters - `--dashboard-dir`: Directory containing dashboard assets (default: "/dashboard") -- `--runtime-class-config-path`: Path to runtime class configuration file +- `--storage-provider-config-path`: Path to storage provider configuration file ### Collector Configuration The collector can be configured using command-line flags: - `--role`: Node role ("Head" or "Worker") -- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss") +- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest") - `--ray-cluster-name`: Name of the Ray cluster - `--ray-cluster-id`: ID of the Ray cluster - `--ray-root-dir`: Root directory for Ray logs - `--log-batching`: Number of log entries to batch before writing - `--events-port`: Port for the events server - `--push-interval`: Interval between pushes to storage -- `--runtime-class-config-path`: Path to runtime class configuration file +- `--storage-provider-config-path`: Path to storage provider configuration file ## Supported Storage Backends @@ -99,7 +99,7 @@ Each backend requires specific configuration parameters passed through environme ```bash ./output/bin/historyserver \ - --runtime-class-name=s3 \ + --storage-provider=s3 \ --ray-root-dir=/path/to/logs ``` @@ -108,7 +108,7 @@ Each backend requires specific configuration parameters passed through environme ```bash ./output/bin/collector \ --role=Head \ - --runtime-class-name=s3 \ + --storage-provider=s3 \ --ray-cluster-name=my-cluster \ --ray-root-dir=/path/to/logs ``` diff --git a/historyserver/cmd/collector/main.go b/historyserver/cmd/collector/main.go index 0a1ed08e7d8..929138f9166 100644 --- a/historyserver/cmd/collector/main.go +++ b/historyserver/cmd/collector/main.go @@ -17,27 +17,25 @@ import ( "github.com/ray-project/kuberay/historyserver/pkg/utils" ) -const runtimeClassConfigPath = "/var/collector-config/data" - func main() { - role := "" - runtimeClassName := "" - rayClusterName := "" - rayClusterId := "" - rayRootDir := "" - logBatching := 1000 - eventsPort := 8080 - pushInterval := time.Minute - runtimeClassConfigPath := "/var/collector-config/data" + var role string + var storageProvider string + var rayClusterName string + var rayClusterId string + var rayRootDir string + var logBatching int + var eventsPort int + var pushInterval time.Duration + var storageProviderConfigPath string flag.StringVar(&role, "role", "Worker", "") - flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") + flag.StringVar(&storageProvider, "storage-provider", "", "") flag.StringVar(&rayClusterName, "ray-cluster-name", "", "") flag.StringVar(&rayClusterId, "ray-cluster-id", "default", "") flag.StringVar(&rayRootDir, "ray-root-dir", "", "") flag.IntVar(&logBatching, "log-batching", 1000, "") flag.IntVar(&eventsPort, "events-port", 8080, "") - flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" + flag.StringVar(&storageProviderConfigPath, "storage-provider-config-path", "", "") //"/var/collector-config/data" flag.DurationVar(&pushInterval, "push-interval", time.Minute, "") flag.Parse() @@ -55,21 +53,21 @@ func main() { sessionName := path.Base(sessionDir) jsonData := make(map[string]interface{}) - if runtimeClassConfigPath != "" { - data, err := os.ReadFile(runtimeClassConfigPath) + if storageProviderConfigPath != "" { + data, err := os.ReadFile(storageProviderConfigPath) if err != nil { - panic("Failed to read runtime class config " + err.Error()) + panic("Failed to read storage provider config " + err.Error()) } err = json.Unmarshal(data, &jsonData) if err != nil { - panic("Failed to parse runtime class config: " + err.Error()) + panic("Failed to parse storage provider config: " + err.Error()) } } registry := collector.GetWriterRegistry() - factory, ok := registry[runtimeClassName] + factory, ok := registry[storageProvider] if !ok { - panic("Not supported runtime class name: " + runtimeClassName + " for role: " + role + ".") + panic("Not supported storage provider: " + storageProvider + " for role: " + role + ".") } globalConfig := types.RayCollectorConfig{ @@ -86,18 +84,18 @@ func main() { writer, err := factory(&globalConfig, jsonData) if err != nil { - panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".") + panic("Failed to create writer for storage provider: " + storageProvider + " for role: " + role + ".") } // Create and initialize EventServer eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName) eventServer.InitServer(eventsPort) - collector := runtime.NewCollector(&globalConfig, writer) - _ = collector.Start(context.TODO().Done()) + logCollector := runtime.NewCollector(&globalConfig, writer) + _ = logCollector.Start(context.TODO().Done()) eventStop := eventServer.WaitForStop() - logStop := collector.WaitForStop() + logStop := logCollector.WaitForStop() <-eventStop logrus.Info("Event server shutdown") <-logStop diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index d8eaaeed479..71b78829d24 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -88,7 +88,7 @@ spec: command: - collector - --role=Head - - --runtime-class-name=s3 + - --storage-provider=s3 - --ray-cluster-name=raycluster-historyserver - --ray-root-dir=log - --events-port=8084 @@ -185,7 +185,7 @@ spec: command: - collector - --role=Worker - - --runtime-class-name=s3 + - --storage-provider=s3 - --ray-cluster-name=raycluster-historyserver - --ray-root-dir=log - --events-port=8084 diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index b0143c63c96..c255fe88152 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -314,7 +314,7 @@ func (es *EventServer) flushEventsInternal(eventsToFlush []Event) { // Categorize events for _, event := range eventsToFlush { - hourKey := event.Timestamp.Truncate(time.Hour).Format("2006-01-02-15") + hourKey := event.Timestamp.Truncate(time.Hour).Format("2006010215") // Check event type if es.isNodeEvent(event.Data) { diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md b/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md index c4817b0dc40..de249790e32 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/README.md @@ -7,7 +7,7 @@ Oss endpoint and oss bucket are read from /var/collector-config/data. Content in /var/collector-config/data should be in json format, like `{"ossBucket": "", "ossEndpoint": ""}` -Set `--runtime-class-name=aliyunoss` to enable this module. +Set `--storage-provider=s3` to enable this module. Currently this module can only be used in ack environment. Oidc must be enabled for the cluster, and permission for write the oss must be granted. diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go index 0e0575db631..64fffbcaa08 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/types.go @@ -36,7 +36,7 @@ type GlobalConfig struct { OSSHistoryServerDir string } -type RayMetaHanderConfig struct { +type RayMetaHandlerConfig struct { GlobalConfig RayClusterName string RayClusterID string diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go index 70c0236e994..8bc7208f417 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/config/validate.go @@ -39,8 +39,8 @@ func ValidateGlobalConfig(c *GlobalConfig, fldpath *field.Path) field.ErrorList return allErrs } -// ValidateMetaHanderConfig is -func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field.ErrorList { +// ValidateMetaHandlerConfig is +func ValidateMetaHandlerConfig(c *RayMetaHandlerConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.RayClusterName) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set")) @@ -51,7 +51,7 @@ func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field return allErrs } -func ValidatRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList { +func ValidateRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.DashBoardDir) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.DashBoardDir, "dashboard-dir must be set")) diff --git a/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go b/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go index 01850fda6ac..816f677cb6f 100644 --- a/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go +++ b/historyserver/pkg/collector/logcollector/storage/aliyunoss/ray/ray.go @@ -179,13 +179,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) { sessionInfo := strings.Split(metas[1], "_") date := sessionInfo[1] dataTime := sessionInfo[2] - createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) + creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) if err != nil { logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err) continue } - c.CreateTimeStamp = createTime.Unix() - c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z")) + c.CreationTimestamp = creationTime.Unix() + c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z")) clusters = append(clusters, *c) } if lsRes.IsTruncated { @@ -240,7 +240,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor return New(config) } -func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { +func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { config := &config{} config.complete(c, jd) diff --git a/historyserver/pkg/collector/logcollector/storage/localtest/reader.go b/historyserver/pkg/collector/logcollector/storage/localtest/reader.go index 3dbba172d1c..73172ecee81 100644 --- a/historyserver/pkg/collector/logcollector/storage/localtest/reader.go +++ b/historyserver/pkg/collector/logcollector/storage/localtest/reader.go @@ -11,38 +11,12 @@ import ( // MockReader is a mock implementation of the StorageReader interface type MockReader struct { + clusters utils.ClusterInfoList data map[string]map[string]string - clusters []utils.ClusterInfo } // NewMockReader creates a new mock reader -func NewMockReader() *MockReader { - clusters := []utils.ClusterInfo{ - { - Name: "cluster-1", - SessionName: "session-1", - CreateTime: "2023-01-01T00:00:00Z", - CreateTimeStamp: 1672531200000, - }, - { - Name: "cluster-2", - SessionName: "session-2", - CreateTime: "2023-01-02T00:00:00Z", - CreateTimeStamp: 1672617600000, - }, - } - - data := map[string]map[string]string{ - "cluster-1": { - "log.txt": "This is log content for cluster-1\nMultiple lines\nof log content", - "metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"createTime\": \"2023-01-01T00:00:00Z\"\n}", - }, - "cluster-2": { - "log.txt": "This is log content for cluster-2\nMultiple lines\nof log content", - "metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"createTime\": \"2023-01-02T00:00:00Z\"\n}", - }, - } - +func NewMockReader(clusters utils.ClusterInfoList, data map[string]map[string]string) *MockReader { return &MockReader{ clusters: clusters, data: data, @@ -75,7 +49,33 @@ func (r *MockReader) ListFiles(clusterId string, dir string) []string { return []string{} } -// NewReader creates a new StorageReader +// NewReader creates a new StorageReader with default mock data. func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) { - return NewMockReader(), nil + clusters := utils.ClusterInfoList{ + { + Name: "cluster-1", + SessionName: "session-1", + CreationTime: "2023-01-01T00:00:00Z", + CreationTimestamp: 1672531200000, + }, + { + Name: "cluster-2", + SessionName: "session-2", + CreationTime: "2023-01-02T00:00:00Z", + CreationTimestamp: 1672617600000, + }, + } + + data := map[string]map[string]string{ + "cluster-1": { + "log.txt": "This is log content for cluster-1\nMultiple lines\nof log content", + "metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"creationTime\": \"2023-01-01T00:00:00Z\"\n}", + }, + "cluster-2": { + "log.txt": "This is log content for cluster-2\nMultiple lines\nof log content", + "metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"creationTime\": \"2023-01-02T00:00:00Z\"\n}", + }, + } + + return NewMockReader(clusters, data), nil } diff --git a/historyserver/pkg/collector/logcollector/storage/s3/README.md b/historyserver/pkg/collector/logcollector/storage/s3/README.md index b5a548c2a78..6388118c920 100644 --- a/historyserver/pkg/collector/logcollector/storage/s3/README.md +++ b/historyserver/pkg/collector/logcollector/storage/s3/README.md @@ -7,6 +7,6 @@ S3 endpoint, S3 region and S3 bucket are read from /var/collector-config/data. Content in /var/collector-config/data should be in json format, like `{"s3Bucket": "", "s3Endpoint": "", "s3Region": ""}` -Set `--runtime-class-name=s3` to enable this module. +Set `--storage-provider=s3` to enable this module. This module can be used with any S3 compatible storage service. diff --git a/historyserver/pkg/collector/logcollector/storage/s3/s3.go b/historyserver/pkg/collector/logcollector/storage/s3/s3.go index 43d5ece7f95..83da4d71c12 100644 --- a/historyserver/pkg/collector/logcollector/storage/s3/s3.go +++ b/historyserver/pkg/collector/logcollector/storage/s3/s3.go @@ -180,13 +180,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) { sessionInfo := strings.Split(metas[1], "_") date := sessionInfo[1] dataTime := sessionInfo[2] - createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) + creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime) if err != nil { logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err) continue } - c.CreateTimeStamp = createTime.Unix() - c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z")) + c.CreationTimestamp = creationTime.Unix() + c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z")) clusters = append(clusters, *c) } return true @@ -251,7 +251,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor return New(config) } -func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { +func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) { config := &config{} config.complete(c, jd) diff --git a/historyserver/pkg/collector/registry.go b/historyserver/pkg/collector/registry.go index 4bbfc6e2fa3..03bbdc6183b 100644 --- a/historyserver/pkg/collector/registry.go +++ b/historyserver/pkg/collector/registry.go @@ -15,8 +15,8 @@ func GetWriterRegistry() WriterRegistry { } var writerRegistry = WriterRegistry{ - "aliyunoss": ray.NewWritter, - "s3": s3.NewWritter, + "aliyunoss": ray.NewWriter, + "s3": s3.NewWriter, } type ReaderRegistry map[string]func(globalData *types.RayHistoryServerConfig, data map[string]interface{}) (storage.StorageReader, error) diff --git a/historyserver/pkg/collector/types/types.go b/historyserver/pkg/collector/types/types.go index c8457fec70e..d9bf8225e5d 100644 --- a/historyserver/pkg/collector/types/types.go +++ b/historyserver/pkg/collector/types/types.go @@ -22,8 +22,8 @@ type RayCollectorConfig struct { PushInterval time.Duration } -// ValidateRayHanderConfig is -func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList { +// ValidateRayHandlerConfig is +func ValidateRayHandlerConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList { var allErrs field.ErrorList if len(c.SessionDir) == 0 { allErrs = append(allErrs, field.Invalid(fldpath, c.SessionDir, "session-dir must be set")) @@ -38,13 +38,5 @@ func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.E allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) } - if c.Role == "Head" { - if len(c.RayClusterName) == 0 { - allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set")) - } - if len(c.RayClusterID) == 0 { - allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) - } - } return allErrs } diff --git a/historyserver/pkg/utils/constant.go b/historyserver/pkg/utils/constant.go new file mode 100644 index 00000000000..31929f60f81 --- /dev/null +++ b/historyserver/pkg/utils/constant.go @@ -0,0 +1,48 @@ +package utils + +import "time" + +// Ray session directory related constants. +const ( + RAY_SESSIONDIR_LOGDIR_NAME = "logs" + RAY_SESSIONDIR_METADIR_NAME = "meta" +) + +// Local Ray runtime paths. +const ( + RaySessionLatestPath = "/tmp/ray/session_latest" + RayNodeIDPath = "/tmp/ray/raylet_node_id" +) + +// OSS meta file keys used by history server. +const ( + RayMetaFile_BasicInfo = "ack__basicinfo" + + RayMetaFile_NodeSummaryKey = "restful__nodes_view_summary" + RayMetaFile_Node_Prefix = "restful__nodes_" + RayMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_" + RayMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_" + RayMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_" + RayMetaFile_JOBDATASETS_Prefix = "restful__api__data__datasets_job_id_" + RayMetaFile_NodeLogs_Prefix = "restful__api__v0__logs_node_id_" + RayMetaFile_ClusterStatus = "restful__api__cluster_status" + RayMetaFile_LOGICAL_ACTORS = "restful__logical__actors" + RayMetaFile_ALLTASKS_DETAIL = "restful__api__v0__tasks_detail" + RayMetaFile_Events = "restful__events" + RayMetaFile_PlacementGroups = "restful__api__v0__placement_groups_detail" + RayMetaFile_ClusterSessionName = "static__api__cluster_session_name" + RayMetaFile_Jobs = "restful__api__jobs" + RayMetaFile_Applications = "restful__api__serve__applications" +) + +// Ray history server log file name. +const RayHistoryServerLogName = "historyserver-ray.log" + +const ( + // DefaultMaxRetryAttempts controls how many times we retry reading + // local Ray metadata files (e.g. session dir, node id) before failing. + DefaultMaxRetryAttempts = 3 + // DefaultInitialRetryDelay is the base delay before the first retry. + // Subsequent retries use an exponential backoff based on this value. + DefaultInitialRetryDelay = 5 * time.Second +) diff --git a/historyserver/pkg/utils/types.go b/historyserver/pkg/utils/types.go index 683e00c3bde..90f5e247700 100644 --- a/historyserver/pkg/utils/types.go +++ b/historyserver/pkg/utils/types.go @@ -17,15 +17,15 @@ limitations under the License. package utils type ClusterInfo struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - SessionName string `json:"sessionName"` - CreateTime string `json:"createTime"` - CreateTimeStamp int64 `json:"createTimeStamp"` + Name string `json:"name"` + Namespace string `json:"namespace"` + SessionName string `json:"sessionName"` + CreationTime string `json:"creationTime"` + CreationTimestamp int64 `json:"creationTimestamp"` } type ClusterInfoList []ClusterInfo func (a ClusterInfoList) Len() int { return len(a) } func (a ClusterInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // Sort descending +func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreationTimestamp > a[j].CreationTimestamp } // Sort descending diff --git a/historyserver/pkg/utils/utils.go b/historyserver/pkg/utils/utils.go index 53040cd6008..de028b87dc5 100644 --- a/historyserver/pkg/utils/utils.go +++ b/historyserver/pkg/utils/utils.go @@ -28,35 +28,6 @@ import ( "github.com/sirupsen/logrus" ) -const ( - RAY_SESSIONDIR_LOGDIR_NAME = "logs" - RAY_SESSIONDIR_METADIR_NAME = "meta" -) - -const ( - OssMetaFile_BasicInfo = "ack__basicinfo" - - OssMetaFile_NodeSummaryKey = "restful__nodes_view_summary" - OssMetaFile_Node_Prefix = "restful__nodes_" - OssMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_" - OssMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_" - OssMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_" - OssMetaFile_JOBDATASETS_Prefix = "restful__api__data__datasets_job_id_" - OssMetaFile_NodeLogs_Prefix = "restful__api__v0__logs_node_id_" - OssMetaFile_ClusterStatus = "restful__api__cluster_status" - OssMetaFile_LOGICAL_ACTORS = "restful__logical__actors" - OssMetaFile_ALLTASKS_DETAIL = "restful__api__v0__tasks_detail" - OssMetaFile_Events = "restful__events" - OssMetaFile_PlacementGroups = "restful__api__v0__placement_groups_detail" - - OssMetaFile_ClusterSessionName = "static__api__cluster_session_name" - - OssMetaFile_Jobs = "restful__api__jobs" - OssMetaFile_Applications = "restful__api__serve__applications" -) - -const RAY_HISTORY_SERVER_LOGNAME = "historyserver-ray.log" - func RecreateObjectDir(bucket *oss.Bucket, dir string, options ...oss.Option) error { objectDir := fmt.Sprintf("%s/", path.Clean(dir)) @@ -202,7 +173,7 @@ func AppendRayClusterNameID(rayClusterName, rayClusterID string) string { return fmt.Sprintf("%s%s%s", rayClusterName, connector, rayClusterID) } -func GetRarClusterNameAndID(rayClusterNameID string) (string, string) { +func GetRayClusterNameAndID(rayClusterNameID string) (string, string) { nameID := strings.Split(rayClusterNameID, connector) if len(nameID) < 2 { logrus.Fatalf("rayClusterNameID %s must match name%sid pattern", rayClusterNameID, connector) @@ -211,9 +182,8 @@ func GetRarClusterNameAndID(rayClusterNameID string) (string, string) { } func GetSessionDir() (string, error) { - session_latest_path := "/tmp/ray/session_latest" for i := 0; i < 12; i++ { - rp, err := os.Readlink(session_latest_path) + rp, err := os.Readlink(RaySessionLatestPath) if err != nil { logrus.Errorf("read session_latest file error %v", err) time.Sleep(time.Second * 5) @@ -226,7 +196,7 @@ func GetSessionDir() (string, error) { func GetRayNodeID() (string, error) { for i := 0; i < 12; i++ { - nodeidBytes, err := os.ReadFile("/tmp/ray/raylet_node_id") + nodeidBytes, err := os.ReadFile(RayNodeIDPath) if err != nil { logrus.Errorf("read nodeid file error %v", err) time.Sleep(time.Second * 5)