From a0b0530d9c5be44427a3f6e1166c3b9844c86fe1 Mon Sep 17 00:00:00 2001 From: YSY Date: Sat, 14 Mar 2026 15:02:08 +0800 Subject: [PATCH 1/3] feat(usage statistics): add configurable usage stats persistence and management APIs --- .../api/handlers/management/config_basic.go | 53 ++++ internal/api/handlers/management/handler.go | 25 +- internal/api/handlers/management/usage.go | 42 +++ internal/api/server.go | 23 ++ internal/config/config.go | 24 ++ internal/usage/persistence.go | 286 ++++++++++++++++++ internal/watcher/diff/config_diff.go | 9 + 7 files changed, 461 insertions(+), 1 deletion(-) create mode 100644 internal/usage/persistence.go diff --git a/internal/api/handlers/management/config_basic.go b/internal/api/handlers/management/config_basic.go index f77e91e9b..140286a85 100644 --- a/internal/api/handlers/management/config_basic.go +++ b/internal/api/handlers/management/config_basic.go @@ -193,6 +193,59 @@ func (h *Handler) PutUsageStatisticsEnabled(c *gin.Context) { h.updateBoolField(c, func(v bool) { h.cfg.UsageStatisticsEnabled = v }) } +// UsagePersistence +func (h *Handler) GetUsagePersistence(c *gin.Context) { + if h == nil || h.cfg == nil { + c.JSON(200, gin.H{"usage-persistence": gin.H{}}) + return + } + status := gin.H{} + if h.usagePersistence != nil { + persistStatus := h.usagePersistence.Status() + status["runtime"] = persistStatus + } + c.JSON(200, gin.H{ + "usage-persistence": h.cfg.UsagePersistence, + "status": status, + }) +} + +func (h *Handler) PutUsagePersistence(c *gin.Context) { + if h == nil || h.cfg == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid config state"}) + return + } + var body struct { + Enabled *bool `json:"enabled"` + FilePath *string `json:"file-path"` + IntervalSeconds *int `json:"interval-seconds"` + } + if errBindJSON := c.ShouldBindJSON(&body); errBindJSON != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid body"}) + return + } + if body.Enabled != nil { + h.cfg.UsagePersistence.Enabled = *body.Enabled + } + if body.FilePath != nil { + h.cfg.UsagePersistence.FilePath = strings.TrimSpace(*body.FilePath) + if h.cfg.UsagePersistence.FilePath == "" { + h.cfg.UsagePersistence.FilePath = "usage-statistics.json" + } + } + if body.IntervalSeconds != nil { + if *body.IntervalSeconds <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "interval-seconds must be greater than 0"}) + return + } + h.cfg.UsagePersistence.IntervalSeconds = *body.IntervalSeconds + } + if h.usagePersistence != nil { + h.usagePersistence.ApplyConfig(h.cfg.UsagePersistence) + } + h.persist(c) +} + // UsageStatisticsEnabled func (h *Handler) GetLoggingToFile(c *gin.Context) { c.JSON(200, gin.H{"logging-to-file": h.cfg.LoggingToFile}) diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index 45786b9d3..d877a7adb 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -42,6 +42,7 @@ type Handler struct { failedAttempts map[string]*attemptInfo // keyed by client IP authManager *coreauth.Manager usageStats *usage.RequestStatistics + usagePersistence *usage.PersistenceManager tokenStore coreauth.Store localPassword string allowRemoteOverride bool @@ -61,10 +62,14 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man failedAttempts: make(map[string]*attemptInfo), authManager: manager, usageStats: usage.GetRequestStatistics(), + usagePersistence: usage.NewPersistenceManager(usage.GetRequestStatistics(), filepath.Dir(configFilePath)), tokenStore: sdkAuth.GetTokenStore(), allowRemoteOverride: envSecret != "", envSecret: envSecret, } + if cfg != nil { + h.usagePersistence.ApplyConfig(cfg.UsagePersistence) + } h.startAttemptCleanup() return h } @@ -105,7 +110,12 @@ func NewHandlerWithoutConfigFilePath(cfg *config.Config, manager *coreauth.Manag } // SetConfig updates the in-memory config reference when the server hot-reloads. -func (h *Handler) SetConfig(cfg *config.Config) { h.cfg = cfg } +func (h *Handler) SetConfig(cfg *config.Config) { + h.cfg = cfg + if h != nil && h.usagePersistence != nil && cfg != nil { + h.usagePersistence.ApplyConfig(cfg.UsagePersistence) + } +} // SetAuthManager updates the auth manager reference used by management endpoints. func (h *Handler) SetAuthManager(manager *coreauth.Manager) { h.authManager = manager } @@ -113,6 +123,9 @@ func (h *Handler) SetAuthManager(manager *coreauth.Manager) { h.authManager = ma // SetUsageStatistics allows replacing the usage statistics reference. func (h *Handler) SetUsageStatistics(stats *usage.RequestStatistics) { h.usageStats = stats } +// SetUsagePersistenceManager replaces the usage persistence manager. +func (h *Handler) SetUsagePersistenceManager(manager *usage.PersistenceManager) { h.usagePersistence = manager } + // SetLocalPassword configures the runtime-local password accepted for localhost requests. func (h *Handler) SetLocalPassword(password string) { h.localPassword = password } @@ -134,6 +147,16 @@ func (h *Handler) SetPostAuthHook(hook coreauth.PostAuthHook) { h.postAuthHook = hook } +// Stop releases background resources owned by management handler. +func (h *Handler) Stop() { + if h == nil { + return + } + if h.usagePersistence != nil { + h.usagePersistence.Stop(true) + } +} + // Middleware enforces access control for management endpoints. // All requests (local and remote) require a valid management key. // Additionally, remote access requires allow-remote-management=true. diff --git a/internal/api/handlers/management/usage.go b/internal/api/handlers/management/usage.go index 5f7940896..e9ebb0873 100644 --- a/internal/api/handlers/management/usage.go +++ b/internal/api/handlers/management/usage.go @@ -77,3 +77,45 @@ func (h *Handler) ImportUsageStatistics(c *gin.Context) { "failed_requests": snapshot.FailureCount, }) } + +// GetUsagePersistenceStatus returns runtime usage persistence status. +func (h *Handler) GetUsagePersistenceStatus(c *gin.Context) { + if h == nil || h.usagePersistence == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "usage persistence unavailable"}) + return + } + c.JSON(http.StatusOK, gin.H{"status": h.usagePersistence.Status()}) +} + +// SaveUsageStatistics persists current usage snapshot immediately. +func (h *Handler) SaveUsageStatistics(c *gin.Context) { + if h == nil || h.usagePersistence == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "usage persistence unavailable"}) + return + } + status, err := h.usagePersistence.SaveNow() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error(), "status": status}) + return + } + c.JSON(http.StatusOK, gin.H{"status": status}) +} + +// LoadUsageStatistics loads usage snapshot from persistence and merges into memory. +func (h *Handler) LoadUsageStatistics(c *gin.Context) { + if h == nil || h.usagePersistence == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "usage persistence unavailable"}) + return + } + result, err := h.usagePersistence.LoadNow() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error(), "result": result}) + return + } + snapshot := h.usageStats.Snapshot() + c.JSON(http.StatusOK, gin.H{ + "result": result, + "total_requests": snapshot.TotalRequests, + "failed_requests": snapshot.FailureCount, + }) +} diff --git a/internal/api/server.go b/internal/api/server.go index 0325ca30c..e6c2b3c85 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -176,6 +176,8 @@ type Server struct { keepAliveOnTimeout func() keepAliveHeartbeat chan struct{} keepAliveStop chan struct{} + + usagePersistence *usage.PersistenceManager } // NewServer creates and initializes a new API server instance. @@ -251,6 +253,7 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk currentPath: wd, envManagementSecret: envManagementSecret, wsRoutes: make(map[string]struct{}), + usagePersistence: usage.NewPersistenceManager(usage.GetRequestStatistics(), filepath.Dir(configFilePath)), } s.wsAuthEnabled.Store(cfg.WebsocketAuth) // Save initial YAML snapshot @@ -263,6 +266,10 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk auth.SetQuotaCooldownDisabled(cfg.DisableCooling) // Initialize management handler s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager) + if s.usagePersistence != nil { + s.usagePersistence.ApplyConfig(cfg.UsagePersistence) + s.mgmt.SetUsagePersistenceManager(s.usagePersistence) + } if optionState.localPassword != "" { s.mgmt.SetLocalPassword(optionState.localPassword) } @@ -489,6 +496,9 @@ func (s *Server) registerManagementRoutes() { mgmt.GET("/usage", s.mgmt.GetUsageStatistics) mgmt.GET("/usage/export", s.mgmt.ExportUsageStatistics) mgmt.POST("/usage/import", s.mgmt.ImportUsageStatistics) + mgmt.GET("/usage/persistence-status", s.mgmt.GetUsagePersistenceStatus) + mgmt.POST("/usage/save", s.mgmt.SaveUsageStatistics) + mgmt.POST("/usage/load", s.mgmt.LoadUsageStatistics) mgmt.GET("/config", s.mgmt.GetConfig) mgmt.GET("/config.yaml", s.mgmt.GetConfigYAML) mgmt.PUT("/config.yaml", s.mgmt.PutConfigYAML) @@ -514,6 +524,10 @@ func (s *Server) registerManagementRoutes() { mgmt.PUT("/usage-statistics-enabled", s.mgmt.PutUsageStatisticsEnabled) mgmt.PATCH("/usage-statistics-enabled", s.mgmt.PutUsageStatisticsEnabled) + mgmt.GET("/usage-persistence", s.mgmt.GetUsagePersistence) + mgmt.PUT("/usage-persistence", s.mgmt.PutUsagePersistence) + mgmt.PATCH("/usage-persistence", s.mgmt.PutUsagePersistence) + mgmt.GET("/proxy-url", s.mgmt.GetProxyURL) mgmt.PUT("/proxy-url", s.mgmt.PutProxyURL) mgmt.PATCH("/proxy-url", s.mgmt.PutProxyURL) @@ -830,6 +844,10 @@ func (s *Server) Stop(ctx context.Context) error { } } + if s.mgmt != nil { + s.mgmt.Stop() + } + // Shutdown the HTTP server. if err := s.server.Shutdown(ctx); err != nil { return fmt.Errorf("failed to shutdown HTTP server: %v", err) @@ -904,6 +922,10 @@ func (s *Server) UpdateClients(cfg *config.Config) { usage.SetStatisticsEnabled(cfg.UsageStatisticsEnabled) } + if s.usagePersistence != nil && (oldCfg == nil || !reflect.DeepEqual(oldCfg.UsagePersistence, cfg.UsagePersistence)) { + s.usagePersistence.ApplyConfig(cfg.UsagePersistence) + } + if s.requestLogger != nil && (oldCfg == nil || oldCfg.ErrorLogsMaxFiles != cfg.ErrorLogsMaxFiles) { if setter, ok := s.requestLogger.(interface{ SetErrorLogsMaxFiles(int) }); ok { setter.SetErrorLogsMaxFiles(cfg.ErrorLogsMaxFiles) @@ -970,6 +992,7 @@ func (s *Server) UpdateClients(cfg *config.Config) { if s.mgmt != nil { s.mgmt.SetConfig(cfg) s.mgmt.SetAuthManager(s.handlers.AuthManager) + s.mgmt.SetUsagePersistenceManager(s.usagePersistence) } // Notify Amp module only when Amp config has changed. diff --git a/internal/config/config.go b/internal/config/config.go index 7bd137e0d..7556be085 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -64,6 +64,9 @@ type Config struct { // UsageStatisticsEnabled toggles in-memory usage aggregation; when false, usage data is discarded. UsageStatisticsEnabled bool `yaml:"usage-statistics-enabled" json:"usage-statistics-enabled"` + // UsagePersistence controls optional periodic persistence of usage statistics. + UsagePersistence UsagePersistenceConfig `yaml:"usage-persistence" json:"usage-persistence"` + // DisableCooling disables quota cooldown scheduling when true. DisableCooling bool `yaml:"disable-cooling" json:"disable-cooling"` @@ -128,6 +131,16 @@ type Config struct { legacyMigrationPending bool `yaml:"-" json:"-"` } +// UsagePersistenceConfig defines usage statistics persistence behavior. +type UsagePersistenceConfig struct { + // Enabled toggles automatic usage persistence. + Enabled bool `yaml:"enabled" json:"enabled"` + // FilePath is the output file path for usage snapshots. Relative paths are resolved from config directory. + FilePath string `yaml:"file-path" json:"file-path"` + // IntervalSeconds controls periodic flush interval in seconds. + IntervalSeconds int `yaml:"interval-seconds" json:"interval-seconds"` +} + // ClaudeHeaderDefaults configures default header values injected into Claude API requests // when the client does not send them. Update these when Claude Code releases a new version. type ClaudeHeaderDefaults struct { @@ -553,6 +566,9 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { cfg.LogsMaxTotalSizeMB = 0 cfg.ErrorLogsMaxFiles = 10 cfg.UsageStatisticsEnabled = false + cfg.UsagePersistence.Enabled = false + cfg.UsagePersistence.FilePath = "usage-statistics.json" + cfg.UsagePersistence.IntervalSeconds = 30 cfg.DisableCooling = false cfg.Pprof.Enable = false cfg.Pprof.Addr = DefaultPprofAddr @@ -618,6 +634,14 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { cfg.MaxRetryCredentials = 0 } + cfg.UsagePersistence.FilePath = strings.TrimSpace(cfg.UsagePersistence.FilePath) + if cfg.UsagePersistence.FilePath == "" { + cfg.UsagePersistence.FilePath = "usage-statistics.json" + } + if cfg.UsagePersistence.IntervalSeconds <= 0 { + cfg.UsagePersistence.IntervalSeconds = 30 + } + // Sanitize Gemini API key configuration and migrate legacy entries. cfg.SanitizeGeminiKeys() diff --git a/internal/usage/persistence.go b/internal/usage/persistence.go new file mode 100644 index 000000000..ae70164ae --- /dev/null +++ b/internal/usage/persistence.go @@ -0,0 +1,286 @@ +package usage + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" +) + +type persistencePayload struct { + Version int `json:"version"` + SavedAt time.Time `json:"saved_at"` + Usage StatisticsSnapshot `json:"usage"` +} + +type PersistenceStatus struct { + Enabled bool `json:"enabled"` + Path string `json:"path"` + IntervalSeconds int `json:"interval_seconds"` + LastSavedAt time.Time `json:"last_saved_at,omitempty"` + LastLoadedAt time.Time `json:"last_loaded_at,omitempty"` + LastError string `json:"last_error,omitempty"` +} + +type PersistenceLoadResult struct { + Added int64 `json:"added"` + Skipped int64 `json:"skipped"` +} + +type PersistenceManager struct { + mu sync.Mutex + + stats *RequestStatistics + baseDir string + + enabled bool + path string + interval time.Duration + + stopCh chan struct{} + running bool + + lastSavedAt time.Time + lastLoadedAt time.Time + lastError string +} + +func NewPersistenceManager(stats *RequestStatistics, baseDir string) *PersistenceManager { + baseDir = strings.TrimSpace(baseDir) + if baseDir == "" { + baseDir = "." + } + return &PersistenceManager{stats: stats, baseDir: baseDir} +} + +func (m *PersistenceManager) ApplyConfig(cfg config.UsagePersistenceConfig) { + if m == nil { + return + } + + enabled := cfg.Enabled + path := m.resolvePath(cfg.FilePath) + intervalSeconds := cfg.IntervalSeconds + if intervalSeconds <= 0 { + intervalSeconds = 30 + } + interval := time.Duration(intervalSeconds) * time.Second + + m.mu.Lock() + shouldRestart := m.running && (m.path != path || m.interval != interval) + if shouldRestart { + close(m.stopCh) + m.running = false + m.stopCh = nil + } + m.enabled = enabled + m.path = path + m.interval = interval + needStart := m.enabled && !m.running + if needStart { + m.stopCh = make(chan struct{}) + m.running = true + } + m.mu.Unlock() + + if !enabled { + return + } + + if needStart || shouldRestart { + _, _ = m.LoadNow() + go m.run() + } +} + +func (m *PersistenceManager) resolvePath(path string) string { + trimmed := strings.TrimSpace(path) + if trimmed == "" { + trimmed = "usage-statistics.json" + } + if filepath.IsAbs(trimmed) { + return trimmed + } + return filepath.Join(m.baseDir, trimmed) +} + +func (m *PersistenceManager) run() { + m.mu.Lock() + stopCh := m.stopCh + interval := m.interval + m.mu.Unlock() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + _, _ = m.SaveNow() + case <-stopCh: + return + } + } +} + +func (m *PersistenceManager) SaveNow() (PersistenceStatus, error) { + if m == nil || m.stats == nil { + return PersistenceStatus{}, fmt.Errorf("usage persistence unavailable") + } + + m.mu.Lock() + path := m.path + if path == "" { + path = m.resolvePath("usage-statistics.json") + m.path = path + } + m.mu.Unlock() + + snapshot := m.stats.Snapshot() + payload := persistencePayload{Version: 1, SavedAt: time.Now().UTC(), Usage: snapshot} + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + m.recordError(err) + return m.Status(), err + } + + if err = os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + m.recordError(err) + return m.Status(), err + } + + tmpFile, err := os.CreateTemp(filepath.Dir(path), "usage-persist-*.json") + if err != nil { + m.recordError(err) + return m.Status(), err + } + tmpName := tmpFile.Name() + writeErr := func(inner error) error { + _ = tmpFile.Close() + _ = os.Remove(tmpName) + m.recordError(inner) + return inner + } + + if _, err = tmpFile.Write(data); err != nil { + return m.Status(), writeErr(err) + } + if err = tmpFile.Sync(); err != nil { + return m.Status(), writeErr(err) + } + if err = tmpFile.Close(); err != nil { + return m.Status(), writeErr(err) + } + if err = os.Rename(tmpName, path); err != nil { + _ = os.Remove(tmpName) + m.recordError(err) + return m.Status(), err + } + + m.mu.Lock() + m.lastSavedAt = payload.SavedAt + m.lastError = "" + m.mu.Unlock() + return m.Status(), nil +} + +func (m *PersistenceManager) LoadNow() (PersistenceLoadResult, error) { + if m == nil || m.stats == nil { + return PersistenceLoadResult{}, fmt.Errorf("usage persistence unavailable") + } + + m.mu.Lock() + path := m.path + if path == "" { + path = m.resolvePath("usage-statistics.json") + m.path = path + } + m.mu.Unlock() + + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + m.mu.Lock() + m.lastError = "" + m.mu.Unlock() + return PersistenceLoadResult{}, nil + } + m.recordError(err) + return PersistenceLoadResult{}, err + } + + var payload persistencePayload + if err = json.Unmarshal(data, &payload); err != nil { + m.recordError(err) + return PersistenceLoadResult{}, err + } + if payload.Version != 0 && payload.Version != 1 { + err = fmt.Errorf("unsupported usage persistence version: %d", payload.Version) + m.recordError(err) + return PersistenceLoadResult{}, err + } + + mergeResult := m.stats.MergeSnapshot(payload.Usage) + loadedAt := time.Now().UTC() + m.mu.Lock() + m.lastLoadedAt = loadedAt + m.lastError = "" + m.mu.Unlock() + + return PersistenceLoadResult{Added: mergeResult.Added, Skipped: mergeResult.Skipped}, nil +} + +func (m *PersistenceManager) Status() PersistenceStatus { + if m == nil { + return PersistenceStatus{} + } + m.mu.Lock() + defer m.mu.Unlock() + interval := 0 + if m.interval > 0 { + interval = int(m.interval / time.Second) + } + return PersistenceStatus{ + Enabled: m.enabled, + Path: m.path, + IntervalSeconds: interval, + LastSavedAt: m.lastSavedAt, + LastLoadedAt: m.lastLoadedAt, + LastError: m.lastError, + } +} + +func (m *PersistenceManager) Stop(flush bool) { + if m == nil { + return + } + m.mu.Lock() + if m.running && m.stopCh != nil { + close(m.stopCh) + } + m.running = false + m.stopCh = nil + m.mu.Unlock() + + if flush { + _, _ = m.SaveNow() + } +} + +func (m *PersistenceManager) recordError(err error) { + if m == nil { + return + } + m.mu.Lock() + if err == nil { + m.lastError = "" + } else { + m.lastError = err.Error() + } + m.mu.Unlock() +} diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index 7997f04eb..8c19b83d2 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -39,6 +39,15 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.UsageStatisticsEnabled != newCfg.UsageStatisticsEnabled { changes = append(changes, fmt.Sprintf("usage-statistics-enabled: %t -> %t", oldCfg.UsageStatisticsEnabled, newCfg.UsageStatisticsEnabled)) } + if oldCfg.UsagePersistence.Enabled != newCfg.UsagePersistence.Enabled { + changes = append(changes, fmt.Sprintf("usage-persistence.enabled: %t -> %t", oldCfg.UsagePersistence.Enabled, newCfg.UsagePersistence.Enabled)) + } + if strings.TrimSpace(oldCfg.UsagePersistence.FilePath) != strings.TrimSpace(newCfg.UsagePersistence.FilePath) { + changes = append(changes, fmt.Sprintf("usage-persistence.file-path: %s -> %s", strings.TrimSpace(oldCfg.UsagePersistence.FilePath), strings.TrimSpace(newCfg.UsagePersistence.FilePath))) + } + if oldCfg.UsagePersistence.IntervalSeconds != newCfg.UsagePersistence.IntervalSeconds { + changes = append(changes, fmt.Sprintf("usage-persistence.interval-seconds: %d -> %d", oldCfg.UsagePersistence.IntervalSeconds, newCfg.UsagePersistence.IntervalSeconds)) + } if oldCfg.DisableCooling != newCfg.DisableCooling { changes = append(changes, fmt.Sprintf("disable-cooling: %t -> %t", oldCfg.DisableCooling, newCfg.DisableCooling)) } From e4e2ef05d9f4db7f63e27473d0feee4beb8269dd Mon Sep 17 00:00:00 2001 From: YSY Date: Sun, 15 Mar 2026 23:39:21 +0800 Subject: [PATCH 2/3] fix(usage): correct persistence lifecycle and shutdown semantics --- internal/api/handlers/management/handler.go | 68 +++-- internal/api/server.go | 12 +- internal/api/server_test.go | 151 +++++++++++ internal/usage/persistence.go | 8 +- internal/usage/persistence_test.go | 265 ++++++++++++++++++++ 5 files changed, 471 insertions(+), 33 deletions(-) create mode 100644 internal/usage/persistence_test.go diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index d877a7adb..84c3d6b05 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -35,39 +35,52 @@ const attemptMaxIdleTime = 2 * time.Hour // Handler aggregates config reference, persistence path and helpers. type Handler struct { - cfg *config.Config - configFilePath string - mu sync.Mutex - attemptsMu sync.Mutex - failedAttempts map[string]*attemptInfo // keyed by client IP - authManager *coreauth.Manager - usageStats *usage.RequestStatistics - usagePersistence *usage.PersistenceManager - tokenStore coreauth.Store - localPassword string - allowRemoteOverride bool - envSecret string - logDir string - postAuthHook coreauth.PostAuthHook + cfg *config.Config + configFilePath string + mu sync.Mutex + attemptsMu sync.Mutex + failedAttempts map[string]*attemptInfo // keyed by client IP + authManager *coreauth.Manager + usageStats *usage.RequestStatistics + usagePersistence *usage.PersistenceManager + ownsUsagePersistence bool + tokenStore coreauth.Store + localPassword string + allowRemoteOverride bool + envSecret string + logDir string + postAuthHook coreauth.PostAuthHook } // NewHandler creates a new management handler instance. func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Manager) *Handler { + return NewHandlerWithUsagePersistence(cfg, configFilePath, manager, nil) +} + +// NewHandlerWithUsagePersistence creates a new management handler and optionally +// reuses an externally managed usage persistence manager. +func NewHandlerWithUsagePersistence(cfg *config.Config, configFilePath string, manager *coreauth.Manager, usagePersistence *usage.PersistenceManager) *Handler { envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD") envSecret = strings.TrimSpace(envSecret) + owned := false + if usagePersistence == nil { + usagePersistence = usage.NewPersistenceManager(usage.GetRequestStatistics(), filepath.Dir(configFilePath)) + owned = true + } h := &Handler{ - cfg: cfg, - configFilePath: configFilePath, - failedAttempts: make(map[string]*attemptInfo), - authManager: manager, - usageStats: usage.GetRequestStatistics(), - usagePersistence: usage.NewPersistenceManager(usage.GetRequestStatistics(), filepath.Dir(configFilePath)), - tokenStore: sdkAuth.GetTokenStore(), - allowRemoteOverride: envSecret != "", - envSecret: envSecret, + cfg: cfg, + configFilePath: configFilePath, + failedAttempts: make(map[string]*attemptInfo), + authManager: manager, + usageStats: usage.GetRequestStatistics(), + usagePersistence: usagePersistence, + ownsUsagePersistence: owned, + tokenStore: sdkAuth.GetTokenStore(), + allowRemoteOverride: envSecret != "", + envSecret: envSecret, } - if cfg != nil { + if cfg != nil && h.ownsUsagePersistence { h.usagePersistence.ApplyConfig(cfg.UsagePersistence) } h.startAttemptCleanup() @@ -112,7 +125,7 @@ func NewHandlerWithoutConfigFilePath(cfg *config.Config, manager *coreauth.Manag // SetConfig updates the in-memory config reference when the server hot-reloads. func (h *Handler) SetConfig(cfg *config.Config) { h.cfg = cfg - if h != nil && h.usagePersistence != nil && cfg != nil { + if h != nil && h.ownsUsagePersistence && h.usagePersistence != nil && cfg != nil { h.usagePersistence.ApplyConfig(cfg.UsagePersistence) } } @@ -124,7 +137,10 @@ func (h *Handler) SetAuthManager(manager *coreauth.Manager) { h.authManager = ma func (h *Handler) SetUsageStatistics(stats *usage.RequestStatistics) { h.usageStats = stats } // SetUsagePersistenceManager replaces the usage persistence manager. -func (h *Handler) SetUsagePersistenceManager(manager *usage.PersistenceManager) { h.usagePersistence = manager } +func (h *Handler) SetUsagePersistenceManager(manager *usage.PersistenceManager) { + h.usagePersistence = manager + h.ownsUsagePersistence = false +} // SetLocalPassword configures the runtime-local password accepted for localhost requests. func (h *Handler) SetLocalPassword(password string) { h.localPassword = password } diff --git a/internal/api/server.go b/internal/api/server.go index e6c2b3c85..acc39a811 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -264,12 +264,12 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk } managementasset.SetCurrentConfig(cfg) auth.SetQuotaCooldownDisabled(cfg.DisableCooling) - // Initialize management handler - s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager) if s.usagePersistence != nil { s.usagePersistence.ApplyConfig(cfg.UsagePersistence) - s.mgmt.SetUsagePersistenceManager(s.usagePersistence) } + + // Initialize management handler + s.mgmt = managementHandlers.NewHandlerWithUsagePersistence(cfg, configFilePath, authManager, s.usagePersistence) if optionState.localPassword != "" { s.mgmt.SetLocalPassword(optionState.localPassword) } @@ -844,12 +844,14 @@ func (s *Server) Stop(ctx context.Context) error { } } + // Shutdown the HTTP server. + err := s.server.Shutdown(ctx) + if s.mgmt != nil { s.mgmt.Stop() } - // Shutdown the HTTP server. - if err := s.server.Shutdown(ctx); err != nil { + if err != nil { return fmt.Errorf("failed to shutdown HTTP server: %v", err) } diff --git a/internal/api/server_test.go b/internal/api/server_test.go index f5c18aa16..13e8ffa74 100644 --- a/internal/api/server_test.go +++ b/internal/api/server_test.go @@ -1,17 +1,24 @@ package api import ( + "context" + "encoding/json" + "fmt" + "net" "net/http" "net/http/httptest" "os" "path/filepath" + "strconv" "strings" + "sync" "testing" "time" gin "github.com/gin-gonic/gin" proxyconfig "github.com/router-for-me/CLIProxyAPI/v6/internal/config" internallogging "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" + "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" sdkaccess "github.com/router-for-me/CLIProxyAPI/v6/sdk/access" "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" @@ -208,3 +215,147 @@ func TestDefaultRequestLoggerFactory_UsesResolvedLogDirectory(t *testing.T) { } } } + +func TestServerStop_GracefulShutdownFlushesAfterInFlightRequest(t *testing.T) { + gin.SetMode(gin.TestMode) + + tmpDir := t.TempDir() + authDir := filepath.Join(tmpDir, "auth") + if err := os.MkdirAll(authDir, 0o700); err != nil { + t.Fatalf("failed to create auth dir: %v", err) + } + + port := getFreeTCPPort(t) + cfg := &proxyconfig.Config{ + SDKConfig: sdkconfig.SDKConfig{ + APIKeys: []string{"test-key"}, + }, + Host: "127.0.0.1", + Port: port, + AuthDir: authDir, + UsageStatisticsEnabled: true, + UsagePersistence: proxyconfig.UsagePersistenceConfig{ + Enabled: true, + FilePath: "usage-shutdown.json", + IntervalSeconds: 3600, + }, + } + + authManager := auth.NewManager(nil, nil, nil) + accessManager := sdkaccess.NewManager() + configPath := filepath.Join(tmpDir, "config.yaml") + + markerAPI := "shutdown-test-api" + server := NewServer(cfg, authManager, accessManager, configPath, WithEngineConfigurator(func(engine *gin.Engine) { + engine.POST("/slow-record", func(c *gin.Context) { + time.Sleep(120 * time.Millisecond) + stats := usage.GetRequestStatistics() + stats.MergeSnapshot(usage.StatisticsSnapshot{ + APIs: map[string]usage.APISnapshot{ + markerAPI: { + Models: map[string]usage.ModelSnapshot{ + "gpt-test": { + Details: []usage.RequestDetail{{ + Timestamp: time.Now().UTC(), + Source: "integration", + Tokens: usage.TokenStats{TotalTokens: 1}, + }}, + }, + }, + }, + }, + }) + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + })) + + startErrCh := make(chan error, 1) + go func() { + startErrCh <- server.Start() + }() + + baseURL := "http://127.0.0.1:" + strconv.Itoa(port) + deadline := time.Now().Add(3 * time.Second) + for { + if time.Now().After(deadline) { + t.Fatalf("server did not start before timeout") + } + resp, err := http.Get(baseURL + "/") + if err == nil { + _ = resp.Body.Close() + break + } + time.Sleep(20 * time.Millisecond) + } + + var wg sync.WaitGroup + wg.Add(1) + requestErrCh := make(chan error, 1) + go func() { + defer wg.Done() + req, err := http.NewRequest(http.MethodPost, baseURL+"/slow-record", nil) + if err != nil { + requestErrCh <- err + return + } + resp, err := (&http.Client{Timeout: 3 * time.Second}).Do(req) + if err != nil { + requestErrCh <- err + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + requestErrCh <- fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return + } + requestErrCh <- nil + }() + + time.Sleep(30 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := server.Stop(ctx); err != nil { + t.Fatalf("server stop failed: %v", err) + } + + wg.Wait() + if reqErr := <-requestErrCh; reqErr != nil { + t.Fatalf("in-flight request failed during shutdown: %v", reqErr) + } + + if err := <-startErrCh; err != nil { + t.Fatalf("server start loop returned error: %v", err) + } + + persistPath := filepath.Join(tmpDir, "usage-shutdown.json") + content, err := os.ReadFile(persistPath) + if err != nil { + t.Fatalf("failed to read persisted usage file: %v", err) + } + + var payload struct { + Usage usage.StatisticsSnapshot `json:"usage"` + } + if err := json.Unmarshal(content, &payload); err != nil { + t.Fatalf("failed to decode persisted usage file: %v", err) + } + + apiSnap, ok := payload.Usage.APIs[markerAPI] + if !ok { + t.Fatalf("expected persisted usage to contain marker api %q", markerAPI) + } + modelSnap, ok := apiSnap.Models["gpt-test"] + if !ok || len(modelSnap.Details) == 0 { + t.Fatalf("expected marker model details to be persisted") + } +} + +func getFreeTCPPort(t *testing.T) int { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to allocate free tcp port: %v", err) + } + defer ln.Close() + return ln.Addr().(*net.TCPAddr).Port +} diff --git a/internal/usage/persistence.go b/internal/usage/persistence.go index ae70164ae..0988b760b 100644 --- a/internal/usage/persistence.go +++ b/internal/usage/persistence.go @@ -73,7 +73,8 @@ func (m *PersistenceManager) ApplyConfig(cfg config.UsagePersistenceConfig) { m.mu.Lock() shouldRestart := m.running && (m.path != path || m.interval != interval) - if shouldRestart { + shouldStop := m.running && (!enabled || shouldRestart) + if shouldStop { close(m.stopCh) m.running = false m.stopCh = nil @@ -219,6 +220,8 @@ func (m *PersistenceManager) LoadNow() (PersistenceLoadResult, error) { m.recordError(err) return PersistenceLoadResult{}, err } + // Accept legacy payloads without explicit version (treated as v0) and current v1 payloads. + // v0 compatibility keeps previously exported snapshots loadable after upgrades. if payload.Version != 0 && payload.Version != 1 { err = fmt.Errorf("unsupported usage persistence version: %d", payload.Version) m.recordError(err) @@ -260,6 +263,7 @@ func (m *PersistenceManager) Stop(flush bool) { return } m.mu.Lock() + enabled := m.enabled if m.running && m.stopCh != nil { close(m.stopCh) } @@ -267,7 +271,7 @@ func (m *PersistenceManager) Stop(flush bool) { m.stopCh = nil m.mu.Unlock() - if flush { + if flush && enabled { _, _ = m.SaveNow() } } diff --git a/internal/usage/persistence_test.go b/internal/usage/persistence_test.go new file mode 100644 index 000000000..018fd89db --- /dev/null +++ b/internal/usage/persistence_test.go @@ -0,0 +1,265 @@ +package usage + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" +) + +func TestPersistenceManager_StartupEnabled_LoadsSnapshotAndRuns(t *testing.T) { + tmpDir := t.TempDir() + stats := NewRequestStatistics() + manager := NewPersistenceManager(stats, tmpDir) + t.Cleanup(func() { manager.Stop(false) }) + + persistPath := filepath.Join(tmpDir, "usage.json") + payload := persistencePayload{ + Version: 1, + SavedAt: time.Now().UTC(), + Usage: StatisticsSnapshot{ + APIs: map[string]APISnapshot{ + "api-key": { + Models: map[string]ModelSnapshot{ + "gpt": { + Details: []RequestDetail{{ + Timestamp: time.Now().UTC(), + Tokens: TokenStats{TotalTokens: 10}, + }}, + }, + }, + }, + }, + }, + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + if err := os.WriteFile(persistPath, data, 0o600); err != nil { + t.Fatalf("write payload: %v", err) + } + + manager.ApplyConfig(config.UsagePersistenceConfig{ + Enabled: true, + FilePath: "usage.json", + IntervalSeconds: 60, + }) + + status := manager.Status() + if !status.Enabled { + t.Fatalf("expected enabled status") + } + if status.LastLoadedAt.IsZero() { + t.Fatalf("expected LastLoadedAt to be set") + } + snapshot := stats.Snapshot() + if snapshot.TotalRequests != 1 { + t.Fatalf("expected merged total requests 1, got %d", snapshot.TotalRequests) + } +} + +func TestPersistenceManager_EnableToDisable_StopsRunningWithoutReconfigure(t *testing.T) { + manager := NewPersistenceManager(NewRequestStatistics(), t.TempDir()) + t.Cleanup(func() { manager.Stop(false) }) + + cfg := config.UsagePersistenceConfig{Enabled: true, FilePath: "usage.json", IntervalSeconds: 60} + manager.ApplyConfig(cfg) + if !manager.running { + t.Fatalf("expected running after enable") + } + + cfg.Enabled = false + manager.ApplyConfig(cfg) + if manager.running { + t.Fatalf("expected running=false after disable with same path/interval") + } +} + +func TestPersistenceManager_ReconfigureInterval_RestartsLoop(t *testing.T) { + manager := NewPersistenceManager(NewRequestStatistics(), t.TempDir()) + t.Cleanup(func() { manager.Stop(false) }) + + manager.ApplyConfig(config.UsagePersistenceConfig{Enabled: true, FilePath: "usage.json", IntervalSeconds: 60}) + + manager.mu.Lock() + firstStopCh := manager.stopCh + manager.mu.Unlock() + + manager.ApplyConfig(config.UsagePersistenceConfig{Enabled: true, FilePath: "usage.json", IntervalSeconds: 120}) + + manager.mu.Lock() + secondStopCh := manager.stopCh + manager.mu.Unlock() + + if firstStopCh == nil || secondStopCh == nil { + t.Fatalf("expected non-nil stop channels before/after reconfigure") + } + if firstStopCh == secondStopCh { + t.Fatalf("expected stop channel to change after interval reconfigure") + } +} + +func TestPersistenceManager_StopFlushDisabled_DoesNotWriteSnapshot(t *testing.T) { + tmpDir := t.TempDir() + persistPath := filepath.Join(tmpDir, "usage.json") + manager := NewPersistenceManager(NewRequestStatistics(), tmpDir) + + manager.ApplyConfig(config.UsagePersistenceConfig{ + Enabled: false, + FilePath: "usage.json", + IntervalSeconds: 30, + }) + manager.Stop(true) + + if _, err := os.Stat(persistPath); !os.IsNotExist(err) { + t.Fatalf("expected no persistence file when disabled stop flush; err=%v", err) + } +} + +func TestPersistenceManager_LoadNow_AcceptsLegacyVersionZero(t *testing.T) { + tmpDir := t.TempDir() + stats := NewRequestStatistics() + manager := NewPersistenceManager(stats, tmpDir) + t.Cleanup(func() { manager.Stop(false) }) + + persistPath := filepath.Join(tmpDir, "usage.json") + payload := persistencePayload{ + Version: 0, + SavedAt: time.Now().UTC(), + Usage: StatisticsSnapshot{ + APIs: map[string]APISnapshot{ + "legacy-api": { + Models: map[string]ModelSnapshot{ + "legacy-model": { + Details: []RequestDetail{{ + Timestamp: time.Now().UTC(), + Tokens: TokenStats{TotalTokens: 2}, + }}, + }, + }, + }, + }, + }, + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + if err := os.WriteFile(persistPath, data, 0o600); err != nil { + t.Fatalf("write payload: %v", err) + } + + manager.ApplyConfig(config.UsagePersistenceConfig{ + Enabled: true, + FilePath: "usage.json", + IntervalSeconds: 60, + }) + + snapshot := stats.Snapshot() + if _, ok := snapshot.APIs["legacy-api"]; !ok { + t.Fatalf("expected v0 snapshot to be loaded") + } +} + +func TestPersistenceManager_LoadNow_CorruptedFileReturnsErrorAndRecoversAfterRewrite(t *testing.T) { + tmpDir := t.TempDir() + stats := NewRequestStatistics() + manager := NewPersistenceManager(stats, tmpDir) + t.Cleanup(func() { manager.Stop(false) }) + + persistPath := filepath.Join(tmpDir, "usage.json") + if err := os.WriteFile(persistPath, []byte("{invalid-json"), 0o600); err != nil { + t.Fatalf("write corrupted payload: %v", err) + } + + manager.ApplyConfig(config.UsagePersistenceConfig{ + Enabled: true, + FilePath: "usage.json", + IntervalSeconds: 60, + }) + + status := manager.Status() + if status.LastError == "" { + t.Fatalf("expected corrupted file load error to be recorded") + } + + if _, err := manager.SaveNow(); err != nil { + t.Fatalf("save after corrupted load failed: %v", err) + } + if _, err := manager.LoadNow(); err != nil { + t.Fatalf("load after rewrite should succeed: %v", err) + } + + status = manager.Status() + if status.LastError != "" { + t.Fatalf("expected last error to be cleared after successful load, got: %q", status.LastError) + } +} + +func TestPersistenceManager_ConcurrentApplyConfigAndStop_NoPanicOrDeadlock(t *testing.T) { + manager := NewPersistenceManager(NewRequestStatistics(), t.TempDir()) + t.Cleanup(func() { manager.Stop(false) }) + + manager.ApplyConfig(config.UsagePersistenceConfig{Enabled: true, FilePath: "usage.json", IntervalSeconds: 1}) + + const goroutines = 24 + const iterations = 40 + + var wg sync.WaitGroup + errCh := make(chan error, goroutines) + + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + errCh <- fmt.Errorf("panic in goroutine %d: %v", idx, r) + } + }() + + for j := 0; j < iterations; j++ { + enabled := (j+idx)%2 == 0 + manager.ApplyConfig(config.UsagePersistenceConfig{ + Enabled: enabled, + FilePath: "usage.json", + IntervalSeconds: 1 + ((j + idx) % 2), + }) + if (j+idx)%3 == 0 { + manager.Stop(false) + } + } + }(i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("concurrent apply/stop timed out, possible deadlock") + } + + close(errCh) + for err := range errCh { + if err != nil { + t.Fatalf("concurrent apply/stop failed: %v", err) + } + } + + status := manager.Status() + if strings.TrimSpace(status.Path) == "" { + t.Fatalf("expected manager to keep a resolved path after concurrent updates") + } +} From 0c9493e64d577c1994d9e3338368d9014090fa94 Mon Sep 17 00:00:00 2001 From: YSY Date: Mon, 16 Mar 2026 00:35:35 +0800 Subject: [PATCH 3/3] feat(usage): add v2 persistence with model time buckets and bounded details --- config.example.yaml | 10 ++ .../api/handlers/management/config_basic.go | 10 +- internal/api/handlers/management/usage.go | 4 +- internal/config/config.go | 7 + internal/usage/logger_plugin.go | 156 ++++++++++++++++-- internal/usage/persistence.go | 15 +- internal/watcher/diff/config_diff.go | 3 + 7 files changed, 186 insertions(+), 19 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index fb29477d9..c2e39f877 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -62,6 +62,16 @@ error-logs-max-files: 10 # When false, disable in-memory usage statistics aggregation usage-statistics-enabled: false +# Usage statistics persistence (optional) +# When enabled, periodically saves aggregated usage snapshots to local file, +# including per-model hourly/daily buckets and recent request details. +# This helps keep frontend trend charts available after restart. +# usage-persistence: +# enabled: false +# file-path: "./usage_stats.json" +# interval-seconds: 300 +# max-details-per-model: 300 + # Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/ # Per-entry proxy-url also supports "direct" or "none" to bypass both the global proxy-url and environment proxies explicitly. proxy-url: "" diff --git a/internal/api/handlers/management/config_basic.go b/internal/api/handlers/management/config_basic.go index 140286a85..55517fa01 100644 --- a/internal/api/handlers/management/config_basic.go +++ b/internal/api/handlers/management/config_basic.go @@ -216,9 +216,10 @@ func (h *Handler) PutUsagePersistence(c *gin.Context) { return } var body struct { - Enabled *bool `json:"enabled"` - FilePath *string `json:"file-path"` - IntervalSeconds *int `json:"interval-seconds"` + Enabled *bool `json:"enabled"` + FilePath *string `json:"file-path"` + IntervalSeconds *int `json:"interval-seconds"` + MaxDetailsPerModel *int `json:"max-details-per-model"` } if errBindJSON := c.ShouldBindJSON(&body); errBindJSON != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid body"}) @@ -240,6 +241,9 @@ func (h *Handler) PutUsagePersistence(c *gin.Context) { } h.cfg.UsagePersistence.IntervalSeconds = *body.IntervalSeconds } + if body.MaxDetailsPerModel != nil { + h.cfg.UsagePersistence.MaxDetailsPerModel = *body.MaxDetailsPerModel + } if h.usagePersistence != nil { h.usagePersistence.ApplyConfig(h.cfg.UsagePersistence) } diff --git a/internal/api/handlers/management/usage.go b/internal/api/handlers/management/usage.go index e9ebb0873..880724a12 100644 --- a/internal/api/handlers/management/usage.go +++ b/internal/api/handlers/management/usage.go @@ -39,7 +39,7 @@ func (h *Handler) ExportUsageStatistics(c *gin.Context) { snapshot = h.usageStats.Snapshot() } c.JSON(http.StatusOK, usageExportPayload{ - Version: 1, + Version: 2, ExportedAt: time.Now().UTC(), Usage: snapshot, }) @@ -63,7 +63,7 @@ func (h *Handler) ImportUsageStatistics(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid json"}) return } - if payload.Version != 0 && payload.Version != 1 { + if payload.Version != 0 && payload.Version != 1 && payload.Version != 2 { c.JSON(http.StatusBadRequest, gin.H{"error": "unsupported version"}) return } diff --git a/internal/config/config.go b/internal/config/config.go index 7556be085..a7185f32c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -139,6 +139,9 @@ type UsagePersistenceConfig struct { FilePath string `yaml:"file-path" json:"file-path"` // IntervalSeconds controls periodic flush interval in seconds. IntervalSeconds int `yaml:"interval-seconds" json:"interval-seconds"` + // MaxDetailsPerModel caps persisted per-model request details to recent N entries. + // Set to 0 or a negative value to disable truncation. + MaxDetailsPerModel int `yaml:"max-details-per-model" json:"max-details-per-model"` } // ClaudeHeaderDefaults configures default header values injected into Claude API requests @@ -569,6 +572,7 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { cfg.UsagePersistence.Enabled = false cfg.UsagePersistence.FilePath = "usage-statistics.json" cfg.UsagePersistence.IntervalSeconds = 30 + cfg.UsagePersistence.MaxDetailsPerModel = 300 cfg.DisableCooling = false cfg.Pprof.Enable = false cfg.Pprof.Addr = DefaultPprofAddr @@ -641,6 +645,9 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { if cfg.UsagePersistence.IntervalSeconds <= 0 { cfg.UsagePersistence.IntervalSeconds = 30 } + if cfg.UsagePersistence.MaxDetailsPerModel == 0 { + cfg.UsagePersistence.MaxDetailsPerModel = 300 + } // Sanitize Gemini API key configuration and migrate legacy entries. cfg.SanitizeGeminiKeys() diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index e4371e8d3..d2c26e3dc 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -60,6 +60,8 @@ func StatisticsEnabled() bool { return statisticsEnabled.Load() } type RequestStatistics struct { mu sync.RWMutex + maxDetailsPerModel int + totalRequests int64 successCount int64 failureCount int64 @@ -85,6 +87,11 @@ type modelStats struct { TotalRequests int64 TotalTokens int64 Details []RequestDetail + + RequestsByDay map[string]int64 + RequestsByHour map[string]int64 + TokensByDay map[string]int64 + TokensByHour map[string]int64 } // RequestDetail stores the timestamp and token usage for a single request. @@ -131,7 +138,12 @@ type APISnapshot struct { type ModelSnapshot struct { TotalRequests int64 `json:"total_requests"` TotalTokens int64 `json:"total_tokens"` - Details []RequestDetail `json:"details"` + Details []RequestDetail `json:"details,omitempty"` + + RequestsByDay map[string]int64 `json:"requests_by_day,omitempty"` + RequestsByHour map[string]int64 `json:"requests_by_hour,omitempty"` + TokensByDay map[string]int64 `json:"tokens_by_day,omitempty"` + TokensByHour map[string]int64 `json:"tokens_by_hour,omitempty"` } var defaultRequestStatistics = NewRequestStatistics() @@ -142,11 +154,26 @@ func GetRequestStatistics() *RequestStatistics { return defaultRequestStatistics // NewRequestStatistics constructs an empty statistics store. func NewRequestStatistics() *RequestStatistics { return &RequestStatistics{ - apis: make(map[string]*apiStats), - requestsByDay: make(map[string]int64), - requestsByHour: make(map[int]int64), - tokensByDay: make(map[string]int64), - tokensByHour: make(map[int]int64), + maxDetailsPerModel: 300, + apis: make(map[string]*apiStats), + requestsByDay: make(map[string]int64), + requestsByHour: make(map[int]int64), + tokensByDay: make(map[string]int64), + tokensByHour: make(map[int]int64), + } +} + +func (s *RequestStatistics) SetMaxDetailsPerModel(limit int) { + if s == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.maxDetailsPerModel = limit + if limit > 0 { + s.truncateAllModelDetailsLocked(limit) } } @@ -215,12 +242,41 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail stats.TotalTokens += detail.Tokens.TotalTokens modelStatsValue, ok := stats.Models[model] if !ok { - modelStatsValue = &modelStats{} + modelStatsValue = &modelStats{ + RequestsByDay: make(map[string]int64), + RequestsByHour: make(map[string]int64), + TokensByDay: make(map[string]int64), + TokensByHour: make(map[string]int64), + } stats.Models[model] = modelStatsValue } + if modelStatsValue.RequestsByDay == nil { + modelStatsValue.RequestsByDay = make(map[string]int64) + } + if modelStatsValue.RequestsByHour == nil { + modelStatsValue.RequestsByHour = make(map[string]int64) + } + if modelStatsValue.TokensByDay == nil { + modelStatsValue.TokensByDay = make(map[string]int64) + } + if modelStatsValue.TokensByHour == nil { + modelStatsValue.TokensByHour = make(map[string]int64) + } modelStatsValue.TotalRequests++ modelStatsValue.TotalTokens += detail.Tokens.TotalTokens + + dayKey := formatDayBucket(detail.Timestamp) + hourKey := formatHourBucket(detail.Timestamp) + modelStatsValue.RequestsByDay[dayKey]++ + modelStatsValue.RequestsByHour[hourKey]++ + modelStatsValue.TokensByDay[dayKey] += detail.Tokens.TotalTokens + modelStatsValue.TokensByHour[hourKey] += detail.Tokens.TotalTokens + modelStatsValue.Details = append(modelStatsValue.Details, detail) + if s.maxDetailsPerModel > 0 && len(modelStatsValue.Details) > s.maxDetailsPerModel { + start := len(modelStatsValue.Details) - s.maxDetailsPerModel + modelStatsValue.Details = append([]RequestDetail(nil), modelStatsValue.Details[start:]...) + } } // Snapshot returns a copy of the aggregated metrics for external consumption. @@ -248,10 +304,35 @@ func (s *RequestStatistics) Snapshot() StatisticsSnapshot { for modelName, modelStatsValue := range stats.Models { requestDetails := make([]RequestDetail, len(modelStatsValue.Details)) copy(requestDetails, modelStatsValue.Details) + + requestsByDay := make(map[string]int64, len(modelStatsValue.RequestsByDay)) + for k, v := range modelStatsValue.RequestsByDay { + requestsByDay[k] = v + } + + requestsByHour := make(map[string]int64, len(modelStatsValue.RequestsByHour)) + for k, v := range modelStatsValue.RequestsByHour { + requestsByHour[k] = v + } + + tokensByDay := make(map[string]int64, len(modelStatsValue.TokensByDay)) + for k, v := range modelStatsValue.TokensByDay { + tokensByDay[k] = v + } + + tokensByHour := make(map[string]int64, len(modelStatsValue.TokensByHour)) + for k, v := range modelStatsValue.TokensByHour { + tokensByHour[k] = v + } + apiSnapshot.Models[modelName] = ModelSnapshot{ - TotalRequests: modelStatsValue.TotalRequests, - TotalTokens: modelStatsValue.TotalTokens, - Details: requestDetails, + TotalRequests: modelStatsValue.TotalRequests, + TotalTokens: modelStatsValue.TotalTokens, + Details: requestDetails, + RequestsByDay: requestsByDay, + RequestsByHour: requestsByHour, + TokensByDay: tokensByDay, + TokensByHour: tokensByHour, } } result.APIs[apiName] = apiSnapshot @@ -330,6 +411,30 @@ func (s *RequestStatistics) MergeSnapshot(snapshot StatisticsSnapshot) MergeResu if modelName == "" { modelName = "unknown" } + + modelStatsValue, ok := stats.Models[modelName] + if !ok || modelStatsValue == nil { + modelStatsValue = &modelStats{ + RequestsByDay: make(map[string]int64), + RequestsByHour: make(map[string]int64), + TokensByDay: make(map[string]int64), + TokensByHour: make(map[string]int64), + } + stats.Models[modelName] = modelStatsValue + } + for k, v := range modelSnapshot.RequestsByDay { + modelStatsValue.RequestsByDay[k] += v + } + for k, v := range modelSnapshot.RequestsByHour { + modelStatsValue.RequestsByHour[k] += v + } + for k, v := range modelSnapshot.TokensByDay { + modelStatsValue.TokensByDay[k] += v + } + for k, v := range modelSnapshot.TokensByHour { + modelStatsValue.TokensByHour[k] += v + } + for _, detail := range modelSnapshot.Details { detail.Tokens = normaliseTokenStats(detail.Tokens) if detail.Timestamp.IsZero() { @@ -470,3 +575,34 @@ func formatHour(hour int) string { hour = hour % 24 return fmt.Sprintf("%02d", hour) } + +func formatDayBucket(timestamp time.Time) string { + if timestamp.IsZero() { + timestamp = time.Now().UTC() + } + return timestamp.UTC().Format("2006-01-02") +} + +func formatHourBucket(timestamp time.Time) string { + if timestamp.IsZero() { + timestamp = time.Now().UTC() + } + return timestamp.UTC().Truncate(time.Hour).Format(time.RFC3339) +} + +func (s *RequestStatistics) truncateAllModelDetailsLocked(limit int) { + for _, stats := range s.apis { + if stats == nil { + continue + } + for _, modelStatsValue := range stats.Models { + if modelStatsValue == nil { + continue + } + if len(modelStatsValue.Details) > limit { + start := len(modelStatsValue.Details) - limit + modelStatsValue.Details = append([]RequestDetail(nil), modelStatsValue.Details[start:]...) + } + } + } +} diff --git a/internal/usage/persistence.go b/internal/usage/persistence.go index 0988b760b..15b7f3d31 100644 --- a/internal/usage/persistence.go +++ b/internal/usage/persistence.go @@ -70,6 +70,13 @@ func (m *PersistenceManager) ApplyConfig(cfg config.UsagePersistenceConfig) { intervalSeconds = 30 } interval := time.Duration(intervalSeconds) * time.Second + maxDetailsPerModel := cfg.MaxDetailsPerModel + if maxDetailsPerModel == 0 { + maxDetailsPerModel = 300 + } + if m.stats != nil { + m.stats.SetMaxDetailsPerModel(maxDetailsPerModel) + } m.mu.Lock() shouldRestart := m.running && (m.path != path || m.interval != interval) @@ -143,8 +150,8 @@ func (m *PersistenceManager) SaveNow() (PersistenceStatus, error) { m.mu.Unlock() snapshot := m.stats.Snapshot() - payload := persistencePayload{Version: 1, SavedAt: time.Now().UTC(), Usage: snapshot} - data, err := json.MarshalIndent(payload, "", " ") + payload := persistencePayload{Version: 2, SavedAt: time.Now().UTC(), Usage: snapshot} + data, err := json.Marshal(payload) if err != nil { m.recordError(err) return m.Status(), err @@ -220,9 +227,9 @@ func (m *PersistenceManager) LoadNow() (PersistenceLoadResult, error) { m.recordError(err) return PersistenceLoadResult{}, err } - // Accept legacy payloads without explicit version (treated as v0) and current v1 payloads. + // Accept legacy payloads without explicit version (treated as v0) and current v1/v2 payloads. // v0 compatibility keeps previously exported snapshots loadable after upgrades. - if payload.Version != 0 && payload.Version != 1 { + if payload.Version != 0 && payload.Version != 1 && payload.Version != 2 { err = fmt.Errorf("unsupported usage persistence version: %d", payload.Version) m.recordError(err) return PersistenceLoadResult{}, err diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index 8c19b83d2..d5ca5de55 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -48,6 +48,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.UsagePersistence.IntervalSeconds != newCfg.UsagePersistence.IntervalSeconds { changes = append(changes, fmt.Sprintf("usage-persistence.interval-seconds: %d -> %d", oldCfg.UsagePersistence.IntervalSeconds, newCfg.UsagePersistence.IntervalSeconds)) } + if oldCfg.UsagePersistence.MaxDetailsPerModel != newCfg.UsagePersistence.MaxDetailsPerModel { + changes = append(changes, fmt.Sprintf("usage-persistence.max-details-per-model: %d -> %d", oldCfg.UsagePersistence.MaxDetailsPerModel, newCfg.UsagePersistence.MaxDetailsPerModel)) + } if oldCfg.DisableCooling != newCfg.DisableCooling { changes = append(changes, fmt.Sprintf("disable-cooling: %t -> %t", oldCfg.DisableCooling, newCfg.DisableCooling)) }