diff --git a/internal/vkgo/rpc/handlers.go b/internal/vkgo/rpc/handlers.go index cac0cc1d5..0a9614162 100644 --- a/internal/vkgo/rpc/handlers.go +++ b/internal/vkgo/rpc/handlers.go @@ -43,7 +43,7 @@ func (s *Server) collectStats(localAddr net.Addr) map[string]string { requestMem, _ := s.reqMemSem.Observe() responseMem, _ := s.respMemSem.Observe() - workersTotal := s.workerPool.Created() + workersTotal, _ := s.workerPool.Created() gc := srvfunc.GetGCStats() gcPausesMs, _ := json.Marshal(gc.LastPausesMs) diff --git a/internal/vkgo/rpc/rpc.go b/internal/vkgo/rpc/rpc.go index b06dca85b..278334955 100644 --- a/internal/vkgo/rpc/rpc.go +++ b/internal/vkgo/rpc/rpc.go @@ -102,7 +102,13 @@ func (err *tagError) Error() string { if err == nil { return "" } - return err.msg + if len(err.msg) != 0 { + return err.msg + } + if err.err != nil { + return err.err.Error() + } + return "" } func (err *tagError) Unwrap() error { diff --git a/internal/vkgo/rpc/server.go b/internal/vkgo/rpc/server.go index d806bff05..63c58514a 100644 --- a/internal/vkgo/rpc/server.go +++ b/internal/vkgo/rpc/server.go @@ -174,6 +174,7 @@ type Server struct { statRequestsCurrent atomic.Int64 statRPS atomic.Int64 statHostname string + statLongPollsWaiting atomic.Int64 opts ServerOptions @@ -1100,6 +1101,22 @@ func (s *Server) RequestsCurrent() int64 { return s.statRequestsCurrent.Load() } +func (s *Server) WorkersPoolSize() (current int, total int) { + return s.workerPool.Created() +} + +func (s *Server) LongPollsWaiting() int64 { + return s.statLongPollsWaiting.Load() +} + +func (s *Server) RequestsMemory() (current int64, total int64) { + return s.reqMemSem.Observe() +} + +func (s *Server) ResponsesMemory() (current int64, total int64) { + return s.respMemSem.Observe() +} + func (s *Server) RPS() int64 { return s.statRPS.Load() } diff --git a/internal/vkgo/rpc/server_conn.go b/internal/vkgo/rpc/server_conn.go index d57b068c7..4b0edd9aa 100644 --- a/internal/vkgo/rpc/server_conn.go +++ b/internal/vkgo/rpc/server_conn.go @@ -95,6 +95,7 @@ func (sc *serverConn) push(hctx *HandlerContext, isLongpoll bool) { sc.mu.Unlock() return // already released } + sc.server.statLongPollsWaiting.Dec() delete(sc.longpollResponses, resp.hctx.queryID) } if sc.closedFlag || hctx.noResult { @@ -309,6 +310,7 @@ func (sc *serverConn) makeLongpollResponse(hctx *HandlerContext, canceller Hijac sc.server.rareLog(&sc.server.lastHijackWarningLog, "[rpc.Server] - invariant violation, hijack response queryID collision") return } + sc.server.statLongPollsWaiting.Inc() sc.longpollResponses[queryID] = hijackedResponse{canceller: canceller, hctx: hctx} if debugTrace { sc.server.addTrace(fmt.Sprintf("makeLongpollResponse %p %d", hctx, hctx.queryID)) @@ -331,6 +333,7 @@ func (sc *serverConn) cancelLongpollResponse(queryID int64) { if debugPrint { fmt.Printf("longpollResponses cancel %d\n", queryID) } + sc.server.statLongPollsWaiting.Dec() delete(sc.longpollResponses, queryID) if debugTrace { sc.server.addTrace(fmt.Sprintf("cancelLongpollResponse %p %d", resp.hctx, queryID)) @@ -344,6 +347,7 @@ func (sc *serverConn) cancelAllLongpollResponses() { sc.mu.Lock() longpollResponses := sc.longpollResponses sc.longpollResponses = nil // If makeLongpollResponse is called, we'll panic. But this must be impossible if SyncHandler follows protocol + sc.server.statLongPollsWaiting.Sub(int64(len(longpollResponses))) if debugTrace { sc.server.addTrace("cancelAllLongpollResponses") } diff --git a/internal/vkgo/rpc/server_options.go b/internal/vkgo/rpc/server_options.go index 272341308..7db12cd45 100644 --- a/internal/vkgo/rpc/server_options.go +++ b/internal/vkgo/rpc/server_options.go @@ -287,8 +287,7 @@ func ServerWithRequestHandler(fn RequestHandlerFunc) ServerOptionsFunc { } } -// TODO: rename ServerWithResponseHandler -func ServerWithRequestEndHandler(fn ResponseHandlerFunc) ServerOptionsFunc { +func ServerWithResponseHandler(fn ResponseHandlerFunc) ServerOptionsFunc { return func(opts *ServerOptions) { opts.ResponseHandler = fn } diff --git a/internal/vkgo/rpc/server_workerpool.go b/internal/vkgo/rpc/server_workerpool.go index f462ac40d..3aa3c86ae 100644 --- a/internal/vkgo/rpc/server_workerpool.go +++ b/internal/vkgo/rpc/server_workerpool.go @@ -70,10 +70,10 @@ func (t *workerPool) Close() { t.cond.Broadcast() } -func (t *workerPool) Created() int { +func (t *workerPool) Created() (current int, total int) { t.mu.Lock() defer t.mu.Unlock() - return t.created + return t.created, t.create } func (t *workerPool) Get(wg *WaitGroup) (*worker, bool) {