Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions historyserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ docker buildx build -t <image-name>:<tag> --platform linux/amd64,linux/arm64 . -

The history server can be configured using command-line flags:

- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--ray-root-dir`: Root directory for Ray logs
- `--kubeconfigs`: Path to kubeconfig file(s) for accessing Kubernetes clusters
- `--dashboard-dir`: Directory containing dashboard assets (default: "/dashboard")
- `--runtime-class-config-path`: Path to runtime class configuration file
- `--storage-provider-config-path`: Path to storage provider configuration file

### Collector Configuration

The collector can be configured using command-line flags:

- `--role`: Node role ("Head" or "Worker")
- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss")
- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--ray-cluster-name`: Name of the Ray cluster
- `--ray-cluster-id`: ID of the Ray cluster
- `--ray-root-dir`: Root directory for Ray logs
- `--log-batching`: Number of log entries to batch before writing
- `--events-port`: Port for the events server
- `--push-interval`: Interval between pushes to storage
- `--runtime-class-config-path`: Path to runtime class configuration file
- `--storage-provider-config-path`: Path to storage provider configuration file

## Supported Storage Backends

Expand All @@ -99,7 +99,7 @@ Each backend requires specific configuration parameters passed through environme

```bash
./output/bin/historyserver \
--runtime-class-name=s3 \
--storage-provider=s3 \
--ray-root-dir=/path/to/logs
```

Expand All @@ -108,7 +108,7 @@ Each backend requires specific configuration parameters passed through environme
```bash
./output/bin/collector \
--role=Head \
--runtime-class-name=s3 \
--storage-provider=s3 \
--ray-cluster-name=my-cluster \
--ray-root-dir=/path/to/logs
```
Expand Down
44 changes: 21 additions & 23 deletions historyserver/cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@ import (
"github.com/ray-project/kuberay/historyserver/pkg/utils"
)

const runtimeClassConfigPath = "/var/collector-config/data"

func main() {
role := ""
runtimeClassName := ""
rayClusterName := ""
rayClusterId := ""
rayRootDir := ""
logBatching := 1000
eventsPort := 8080
pushInterval := time.Minute
runtimeClassConfigPath := "/var/collector-config/data"
var role string
var storageProvider string
var rayClusterName string
var rayClusterId string
var rayRootDir string
var logBatching int
var eventsPort int
var pushInterval time.Duration
var storageProviderConfigPath string

flag.StringVar(&role, "role", "Worker", "")
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&storageProvider, "storage-provider", "", "")
flag.StringVar(&rayClusterName, "ray-cluster-name", "", "")
flag.StringVar(&rayClusterId, "ray-cluster-id", "default", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.IntVar(&logBatching, "log-batching", 1000, "")
flag.IntVar(&eventsPort, "events-port", 8080, "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.StringVar(&storageProviderConfigPath, "storage-provider-config-path", "", "") //"/var/collector-config/data"
flag.DurationVar(&pushInterval, "push-interval", time.Minute, "")

flag.Parse()
Expand All @@ -55,21 +53,21 @@ func main() {
sessionName := path.Base(sessionDir)

jsonData := make(map[string]interface{})
if runtimeClassConfigPath != "" {
data, err := os.ReadFile(runtimeClassConfigPath)
if storageProviderConfigPath != "" {
data, err := os.ReadFile(storageProviderConfigPath)
if err != nil {
panic("Failed to read runtime class config " + err.Error())
panic("Failed to read storage provider config " + err.Error())
}
err = json.Unmarshal(data, &jsonData)
if err != nil {
panic("Failed to parse runtime class config: " + err.Error())
panic("Failed to parse storage provider config: " + err.Error())
}
}

registry := collector.GetWriterRegistry()
factory, ok := registry[runtimeClassName]
factory, ok := registry[storageProvider]
if !ok {
panic("Not supported runtime class name: " + runtimeClassName + " for role: " + role + ".")
panic("Not supported storage provider: " + storageProvider + " for role: " + role + ".")
}

globalConfig := types.RayCollectorConfig{
Expand All @@ -86,18 +84,18 @@ func main() {

writer, err := factory(&globalConfig, jsonData)
if err != nil {
panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".")
panic("Failed to create writer for storage provider: " + storageProvider + " for role: " + role + ".")
}

// Create and initialize EventServer
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
eventServer.InitServer(eventsPort)

collector := runtime.NewCollector(&globalConfig, writer)
_ = collector.Start(context.TODO().Done())
logCollector := runtime.NewCollector(&globalConfig, writer)
_ = logCollector.Start(context.TODO().Done())

eventStop := eventServer.WaitForStop()
logStop := collector.WaitForStop()
logStop := logCollector.WaitForStop()
<-eventStop
logrus.Info("Event server shutdown")
<-logStop
Expand Down
2 changes: 1 addition & 1 deletion historyserver/pkg/collector/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (es *EventServer) flushEventsInternal(eventsToFlush []Event) {

// Categorize events
for _, event := range eventsToFlush {
hourKey := event.Timestamp.Truncate(time.Hour).Format("2006-01-02-15")
hourKey := event.Timestamp.Truncate(time.Hour).Format("2006010215")

// Check event type
if es.isNodeEvent(event.Data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Oss endpoint and oss bucket are read from /var/collector-config/data.
Content in /var/collector-config/data should be in json format, like
`{"ossBucket": "", "ossEndpoint": ""}`

Set `--runtime-class-name=aliyunoss` to enable this module.
Set `--storage-provider=s3` to enable this module.

Currently this module can only be used in ack environment. Oidc must be
enabled for the cluster, and permission for write the oss must be granted.
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
sessionInfo := strings.Split(metas[1], "_")
date := sessionInfo[1]
dataTime := sessionInfo[2]
createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
if err != nil {
logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err)
continue
}
c.CreateTimeStamp = createTime.Unix()
c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z"))
c.CreationTimestamp = creationTime.Unix()
c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z"))
clusters = append(clusters, *c)
}
if lsRes.IsTruncated {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,12 @@ import (

// MockReader is a mock implementation of the StorageReader interface
type MockReader struct {
clusters utils.ClusterInfoList
data map[string]map[string]string
clusters []utils.ClusterInfo
}

// NewMockReader creates a new mock reader
func NewMockReader() *MockReader {
clusters := []utils.ClusterInfo{
{
Name: "cluster-1",
SessionName: "session-1",
CreateTime: "2023-01-01T00:00:00Z",
CreateTimeStamp: 1672531200000,
},
{
Name: "cluster-2",
SessionName: "session-2",
CreateTime: "2023-01-02T00:00:00Z",
CreateTimeStamp: 1672617600000,
},
}

data := map[string]map[string]string{
"cluster-1": {
"log.txt": "This is log content for cluster-1\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"createTime\": \"2023-01-01T00:00:00Z\"\n}",
},
"cluster-2": {
"log.txt": "This is log content for cluster-2\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"createTime\": \"2023-01-02T00:00:00Z\"\n}",
},
}

func NewMockReader(clusters utils.ClusterInfoList, data map[string]map[string]string) *MockReader {
return &MockReader{
clusters: clusters,
data: data,
Expand Down Expand Up @@ -75,7 +49,33 @@ func (r *MockReader) ListFiles(clusterId string, dir string) []string {
return []string{}
}

// NewReader creates a new StorageReader
// NewReader creates a new StorageReader with default mock data.
func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) {
return NewMockReader(), nil
clusters := utils.ClusterInfoList{
{
Name: "cluster-1",
SessionName: "session-1",
CreationTime: "2023-01-01T00:00:00Z",
CreationTimestamp: 1672531200000,
},
{
Name: "cluster-2",
SessionName: "session-2",
CreationTime: "2023-01-02T00:00:00Z",
CreationTimestamp: 1672617600000,
},
}

data := map[string]map[string]string{
"cluster-1": {
"log.txt": "This is log content for cluster-1\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"creationTime\": \"2023-01-01T00:00:00Z\"\n}",
},
"cluster-2": {
"log.txt": "This is log content for cluster-2\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"creationTime\": \"2023-01-02T00:00:00Z\"\n}",
},
}

return NewMockReader(clusters, data), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ S3 endpoint, S3 region and S3 bucket are read from /var/collector-config/data.
Content in /var/collector-config/data should be in json format, like
`{"s3Bucket": "", "s3Endpoint": "", "s3Region": ""}`

Set `--runtime-class-name=s3` to enable this module.
Set `--storage-provider=s3` to enable this module.

This module can be used with any S3 compatible storage service.
6 changes: 3 additions & 3 deletions historyserver/pkg/collector/logcollector/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
sessionInfo := strings.Split(metas[1], "_")
date := sessionInfo[1]
dataTime := sessionInfo[2]
createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
if err != nil {
logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err)
continue
}
c.CreateTimeStamp = createTime.Unix()
c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z"))
c.CreationTimestamp = creationTime.Unix()
c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z"))
clusters = append(clusters, *c)
}
return true
Expand Down
8 changes: 0 additions & 8 deletions historyserver/pkg/collector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,5 @@ func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.E
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set"))
}

if c.Role == "Head" {
if len(c.RayClusterName) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set"))
}
if len(c.RayClusterID) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set"))
}
}
return allErrs
}
48 changes: 48 additions & 0 deletions historyserver/pkg/utils/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package utils

import "time"

// Ray session directory related constants.
const (
RAY_SESSIONDIR_LOGDIR_NAME = "logs"
RAY_SESSIONDIR_METADIR_NAME = "meta"
)

// Local Ray runtime paths.
const (
RaySessionLatestPath = "/tmp/ray/session_latest"
RayNodeIDPath = "/tmp/ray/raylet_node_id"
)

// OSS meta file keys used by history server.
const (
RayMetaFile_BasicInfo = "ack__basicinfo"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clarify where these env vars will be used.


RayMetaFile_NodeSummaryKey = "restful__nodes_view_summary"
RayMetaFile_Node_Prefix = "restful__nodes_"
RayMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_"
RayMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_"
RayMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_"
RayMetaFile_JOBDATASETS_Prefix = "restful__api__data__datasets_job_id_"
RayMetaFile_NodeLogs_Prefix = "restful__api__v0__logs_node_id_"
RayMetaFile_ClusterStatus = "restful__api__cluster_status"
RayMetaFile_LOGICAL_ACTORS = "restful__logical__actors"
RayMetaFile_ALLTASKS_DETAIL = "restful__api__v0__tasks_detail"
RayMetaFile_Events = "restful__events"
RayMetaFile_PlacementGroups = "restful__api__v0__placement_groups_detail"
RayMetaFile_ClusterSessionName = "static__api__cluster_session_name"
RayMetaFile_Jobs = "restful__api__jobs"
RayMetaFile_Applications = "restful__api__serve__applications"
)

// Ray history server log file name.
const RayHistoryServerLogName = "historyserver-ray.log"

const (
// DefaultMaxRetryAttempts controls how many times we retry reading
// local Ray metadata files (e.g. session dir, node id) before failing.
DefaultMaxRetryAttempts = 3
// DefaultInitialRetryDelay is the base delay before the first retry.
// Subsequent retries use an exponential backoff based on this value.
DefaultInitialRetryDelay = 5 * time.Second
)
12 changes: 6 additions & 6 deletions historyserver/pkg/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ limitations under the License.
package utils

type ClusterInfo struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
SessionName string `json:"sessionName"`
CreateTime string `json:"createTime"`
CreateTimeStamp int64 `json:"createTimeStamp"`
Name string `json:"name"`
Namespace string `json:"namespace"`
SessionName string `json:"sessionName"`
CreationTime string `json:"creationTime"`
CreationTimestamp int64 `json:"creationTimestamp"`
Comment on lines +23 to +24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CreationTime is more human-friendly and can be displayed directly in the UI. In contrast, CreationTimestamp is more machine-oriented and better suited for sorting and comparisons, for example:

func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // 降序排序

If this feels redundant, we can discuss which one to keep to improve maintainability.

Original discussion: #4241 (comment)

}

type ClusterInfoList []ClusterInfo

func (a ClusterInfoList) Len() int { return len(a) }
func (a ClusterInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // Sort descending
func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreationTimestamp > a[j].CreationTimestamp } // Sort descending
Loading
Loading