Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion historyserver/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ localimage-collector:

.PHONY: localimage
localimage: dockerbuilder_instance
docker buildx build -t historyserver:laster --platform linux/amd64 . --load
docker buildx build -t historyserver:latest --platform linux/amd64 . --load

.PHONY: dockerbuilder_instance
dockerbuilder_instance:
Expand Down
12 changes: 6 additions & 6 deletions historyserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ docker buildx build -t <image-name>:<tag> --platform linux/amd64,linux/arm64 . -

The history server can be configured using command-line flags:

- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--ray-root-dir`: Root directory for Ray logs
- `--kubeconfigs`: Path to kubeconfig file(s) for accessing Kubernetes clusters
- `--dashboard-dir`: Directory containing dashboard assets (default: "/dashboard")
- `--runtime-class-config-path`: Path to runtime class configuration file
- `--storage-provider-config-path`: Path to storage provider configuration file

### Collector Configuration

The collector can be configured using command-line flags:

- `--role`: Node role ("Head" or "Worker")
- `--runtime-class-name`: Storage backend type (e.g., "s3", "aliyunoss")
- `--storage-provider`: Storage backend type (e.g., "s3", "aliyunoss", "localtest")
- `--ray-cluster-name`: Name of the Ray cluster
- `--ray-cluster-id`: ID of the Ray cluster
- `--ray-root-dir`: Root directory for Ray logs
- `--log-batching`: Number of log entries to batch before writing
- `--events-port`: Port for the events server
- `--push-interval`: Interval between pushes to storage
- `--runtime-class-config-path`: Path to runtime class configuration file
- `--storage-provider-config-path`: Path to storage provider configuration file

## Supported Storage Backends

Expand All @@ -99,7 +99,7 @@ Each backend requires specific configuration parameters passed through environme

```bash
./output/bin/historyserver \
--runtime-class-name=s3 \
--storage-provider=s3 \
--ray-root-dir=/path/to/logs
```

Expand All @@ -108,7 +108,7 @@ Each backend requires specific configuration parameters passed through environme
```bash
./output/bin/collector \
--role=Head \
--runtime-class-name=s3 \
--storage-provider=s3 \
--ray-cluster-name=my-cluster \
--ray-root-dir=/path/to/logs
```
Expand Down
44 changes: 21 additions & 23 deletions historyserver/cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@ import (
"github.com/ray-project/kuberay/historyserver/pkg/utils"
)

const runtimeClassConfigPath = "/var/collector-config/data"

func main() {
role := ""
runtimeClassName := ""
rayClusterName := ""
rayClusterId := ""
rayRootDir := ""
logBatching := 1000
eventsPort := 8080
pushInterval := time.Minute
runtimeClassConfigPath := "/var/collector-config/data"
var role string
var storageProvider string
var rayClusterName string
var rayClusterId string
var rayRootDir string
var logBatching int
var eventsPort int
var pushInterval time.Duration
var storageProviderConfigPath string

flag.StringVar(&role, "role", "Worker", "")
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&storageProvider, "storage-provider", "", "")
flag.StringVar(&rayClusterName, "ray-cluster-name", "", "")
flag.StringVar(&rayClusterId, "ray-cluster-id", "default", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.IntVar(&logBatching, "log-batching", 1000, "")
flag.IntVar(&eventsPort, "events-port", 8080, "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.StringVar(&storageProviderConfigPath, "storage-provider-config-path", "", "") //"/var/collector-config/data"
flag.DurationVar(&pushInterval, "push-interval", time.Minute, "")

flag.Parse()
Expand All @@ -55,21 +53,21 @@ func main() {
sessionName := path.Base(sessionDir)

jsonData := make(map[string]interface{})
if runtimeClassConfigPath != "" {
data, err := os.ReadFile(runtimeClassConfigPath)
if storageProviderConfigPath != "" {
data, err := os.ReadFile(storageProviderConfigPath)
if err != nil {
panic("Failed to read runtime class config " + err.Error())
panic("Failed to read storage provider config " + err.Error())
}
err = json.Unmarshal(data, &jsonData)
if err != nil {
panic("Failed to parse runtime class config: " + err.Error())
panic("Failed to parse storage provider config: " + err.Error())
}
}

registry := collector.GetWriterRegistry()
factory, ok := registry[runtimeClassName]
factory, ok := registry[storageProvider]
if !ok {
panic("Not supported runtime class name: " + runtimeClassName + " for role: " + role + ".")
panic("Not supported storage provider: " + storageProvider + " for role: " + role + ".")
}

globalConfig := types.RayCollectorConfig{
Expand All @@ -86,18 +84,18 @@ func main() {

writer, err := factory(&globalConfig, jsonData)
if err != nil {
panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".")
panic("Failed to create writer for storage provider: " + storageProvider + " for role: " + role + ".")
}

// Create and initialize EventServer
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
eventServer.InitServer(eventsPort)

collector := runtime.NewCollector(&globalConfig, writer)
_ = collector.Start(context.TODO().Done())
logCollector := runtime.NewCollector(&globalConfig, writer)
_ = logCollector.Start(context.TODO().Done())

eventStop := eventServer.WaitForStop()
logStop := collector.WaitForStop()
logStop := logCollector.WaitForStop()
<-eventStop
logrus.Info("Event server shutdown")
<-logStop
Expand Down
4 changes: 2 additions & 2 deletions historyserver/config/raycluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ spec:
command:
- collector
- --role=Head
- --runtime-class-name=s3
- --storage-provider=s3
- --ray-cluster-name=raycluster-historyserver
- --ray-root-dir=log
- --events-port=8084
Expand Down Expand Up @@ -185,7 +185,7 @@ spec:
command:
- collector
- --role=Worker
- --runtime-class-name=s3
- --storage-provider=s3
- --ray-cluster-name=raycluster-historyserver
- --ray-root-dir=log
- --events-port=8084
Expand Down
2 changes: 1 addition & 1 deletion historyserver/pkg/collector/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (es *EventServer) flushEventsInternal(eventsToFlush []Event) {

// Categorize events
for _, event := range eventsToFlush {
hourKey := event.Timestamp.Truncate(time.Hour).Format("2006-01-02-15")
hourKey := event.Timestamp.Truncate(time.Hour).Format("2006010215")

// Check event type
if es.isNodeEvent(event.Data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Oss endpoint and oss bucket are read from /var/collector-config/data.
Content in /var/collector-config/data should be in json format, like
`{"ossBucket": "", "ossEndpoint": ""}`

Set `--runtime-class-name=aliyunoss` to enable this module.
Set `--storage-provider=s3` to enable this module.

Currently this module can only be used in ack environment. Oidc must be
enabled for the cluster, and permission for write the oss must be granted.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type GlobalConfig struct {
OSSHistoryServerDir string
}

type RayMetaHanderConfig struct {
type RayMetaHandlerConfig struct {
GlobalConfig
RayClusterName string
RayClusterID string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func ValidateGlobalConfig(c *GlobalConfig, fldpath *field.Path) field.ErrorList
return allErrs
}

// ValidateMetaHanderConfig is
func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field.ErrorList {
// ValidateMetaHandlerConfig is
func ValidateMetaHandlerConfig(c *RayMetaHandlerConfig, fldpath *field.Path) field.ErrorList {
var allErrs field.ErrorList
if len(c.RayClusterName) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set"))
Expand All @@ -51,7 +51,7 @@ func ValidateMetaHanderConfig(c *RayMetaHanderConfig, fldpath *field.Path) field
return allErrs
}

func ValidatRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList {
func ValidateRayHistoryServerConfig(c *RayHistoryServerConfig, fldpath *field.Path) field.ErrorList {
var allErrs field.ErrorList
if len(c.DashBoardDir) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.DashBoardDir, "dashboard-dir must be set"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
sessionInfo := strings.Split(metas[1], "_")
date := sessionInfo[1]
dataTime := sessionInfo[2]
createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
if err != nil {
logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err)
continue
}
c.CreateTimeStamp = createTime.Unix()
c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z"))
c.CreationTimestamp = creationTime.Unix()
c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z"))
clusters = append(clusters, *c)
}
if lsRes.IsTruncated {
Expand Down Expand Up @@ -240,7 +240,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor
return New(config)
}

func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) {
func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) {
config := &config{}
config.complete(c, jd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,12 @@ import (

// MockReader is a mock implementation of the StorageReader interface
type MockReader struct {
clusters utils.ClusterInfoList
data map[string]map[string]string
clusters []utils.ClusterInfo
}

// NewMockReader creates a new mock reader
func NewMockReader() *MockReader {
clusters := []utils.ClusterInfo{
{
Name: "cluster-1",
SessionName: "session-1",
CreateTime: "2023-01-01T00:00:00Z",
CreateTimeStamp: 1672531200000,
},
{
Name: "cluster-2",
SessionName: "session-2",
CreateTime: "2023-01-02T00:00:00Z",
CreateTimeStamp: 1672617600000,
},
}

data := map[string]map[string]string{
"cluster-1": {
"log.txt": "This is log content for cluster-1\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"createTime\": \"2023-01-01T00:00:00Z\"\n}",
},
"cluster-2": {
"log.txt": "This is log content for cluster-2\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"createTime\": \"2023-01-02T00:00:00Z\"\n}",
},
}

func NewMockReader(clusters utils.ClusterInfoList, data map[string]map[string]string) *MockReader {
return &MockReader{
clusters: clusters,
data: data,
Expand Down Expand Up @@ -75,7 +49,33 @@ func (r *MockReader) ListFiles(clusterId string, dir string) []string {
return []string{}
}

// NewReader creates a new StorageReader
// NewReader creates a new StorageReader with default mock data.
func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) {
return NewMockReader(), nil
clusters := utils.ClusterInfoList{
{
Name: "cluster-1",
SessionName: "session-1",
CreationTime: "2023-01-01T00:00:00Z",
CreationTimestamp: 1672531200000,
},
{
Name: "cluster-2",
SessionName: "session-2",
CreationTime: "2023-01-02T00:00:00Z",
CreationTimestamp: 1672617600000,
},
}

data := map[string]map[string]string{
"cluster-1": {
"log.txt": "This is log content for cluster-1\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-1\",\n \"sessionName\": \"session-1\",\n \"creationTime\": \"2023-01-01T00:00:00Z\"\n}",
},
"cluster-2": {
"log.txt": "This is log content for cluster-2\nMultiple lines\nof log content",
"metadata.json": "{\n \"name\": \"cluster-2\",\n \"sessionName\": \"session-2\",\n \"creationTime\": \"2023-01-02T00:00:00Z\"\n}",
},
}

return NewMockReader(clusters, data), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ S3 endpoint, S3 region and S3 bucket are read from /var/collector-config/data.
Content in /var/collector-config/data should be in json format, like
`{"s3Bucket": "", "s3Endpoint": "", "s3Region": ""}`

Set `--runtime-class-name=s3` to enable this module.
Set `--storage-provider=s3` to enable this module.

This module can be used with any S3 compatible storage service.
8 changes: 4 additions & 4 deletions historyserver/pkg/collector/logcollector/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
sessionInfo := strings.Split(metas[1], "_")
date := sessionInfo[1]
dataTime := sessionInfo[2]
createTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
creationTime, err := time.Parse("2006-01-02_15-04-05", date+"_"+dataTime)
if err != nil {
logrus.Errorf("Failed to parse time %s: %v", date+"_"+dataTime, err)
continue
}
c.CreateTimeStamp = createTime.Unix()
c.CreateTime = createTime.UTC().Format(("2006-01-02T15:04:05Z"))
c.CreationTimestamp = creationTime.Unix()
c.CreationTime = creationTime.UTC().Format(("2006-01-02T15:04:05Z"))
clusters = append(clusters, *c)
}
return true
Expand Down Expand Up @@ -251,7 +251,7 @@ func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (stor
return New(config)
}

func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) {
func NewWriter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWriter, error) {
config := &config{}
config.complete(c, jd)

Expand Down
4 changes: 2 additions & 2 deletions historyserver/pkg/collector/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func GetWriterRegistry() WriterRegistry {
}

var writerRegistry = WriterRegistry{
"aliyunoss": ray.NewWritter,
"s3": s3.NewWritter,
"aliyunoss": ray.NewWriter,
"s3": s3.NewWriter,
}

type ReaderRegistry map[string]func(globalData *types.RayHistoryServerConfig, data map[string]interface{}) (storage.StorageReader, error)
Expand Down
12 changes: 2 additions & 10 deletions historyserver/pkg/collector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type RayCollectorConfig struct {
PushInterval time.Duration
}

// ValidateRayHanderConfig is
func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList {
// ValidateRayHandlerConfig is
func ValidateRayHandlerConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList {
var allErrs field.ErrorList
if len(c.SessionDir) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.SessionDir, "session-dir must be set"))
Expand All @@ -38,13 +38,5 @@ func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.E
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set"))
}

if c.Role == "Head" {
if len(c.RayClusterName) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set"))
}
if len(c.RayClusterID) == 0 {
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set"))
}
}
return allErrs
}
Loading
Loading