Skip to content

Commit

Permalink
client: simplfy idPool to prevent memory leak
Browse files Browse the repository at this point in the history
* Replaced go-routine impl with lock to simplify
  the code and solve the leaking.
  • Loading branch information
mpdroog authored and yookoala committed Feb 19, 2021
1 parent 847a66b commit 4663d83
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 83 deletions.
78 changes: 36 additions & 42 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -31,6 +32,8 @@ const (
RoleResponder Role = iota + 1
RoleAuthorizer
RoleFilter

MaxRequestID = ^uint16(0)
)

// NewRequest returns a standard FastCGI request
Expand Down Expand Up @@ -64,57 +67,53 @@ type Request struct {
}

type idPool struct {
IDs chan uint16
IDs uint16

Used *sync.Map
Lock *sync.Mutex
}

// AllocID implements Client.AllocID
func (p *idPool) Alloc() uint16 {
return <-p.IDs
p.Lock.Lock()
next:
idx := p.IDs
if idx == MaxRequestID {
// reset
p.IDs = 0
}
p.IDs++

if _, inuse := p.Used.Load(idx); inuse {
// Allow other go-routine to take priority
// to prevent spinlock here
runtime.Gosched()
goto next
}

p.Used.Store(idx, struct{}{})
p.Lock.Unlock()

return idx
}

// ReleaseID implements Client.ReleaseID
func (p *idPool) Release(id uint16) {
go func() {
// release the ID back to channel for reuse
// use goroutine to prev0, ent blocking ReleaseID
p.IDs <- id
}()
p.Used.Delete(id)
}

func newIDs(limit uint32) (p idPool) {

// sanatize limit
if limit == 0 || limit > 65535 {
// Note: limit is the size of the pool
// Since 0 cannot be requestId, the effective
// pool is from 1 to 65535, hence size is 65535.
limit = 65535
func newIDs() *idPool {
return &idPool{
Used: new(sync.Map),
Lock: new(sync.Mutex),
IDs: uint16(1),
}

// pool requestID for the client
//
// requestID: Identifies the FastCGI request to which the record belongs.
// The Web server re-uses FastCGI request IDs; the application
// keeps track of the current state of each request ID on a given
// transport connection.
//
// Ref: https://fast-cgi.github.io/spec#33-records
ids := make(chan uint16)
go func(maxID uint16) {
for i := uint16(1); i < maxID; i++ {
ids <- i
}
ids <- uint16(maxID)
}(uint16(limit))

p.IDs = ids
return
}

// client is the default implementation of Client
type client struct {
conn *conn
ids idPool
ids *idPool
}

// writeRequest writes params and stdin to the FastCGI application
Expand Down Expand Up @@ -388,12 +387,7 @@ type ClientFactory func() (Client, error)
// SimpleClientFactory returns a ClientFactory implementation
// with the given ConnFactory.
//
// limit is the maximum number of request that the
// applcation support. 0 means the maximum number
// available for 16bit request id (because 0 is not
// a valid reqeust id, 65535).
//
// Default 0.
// limit is UNUSED.
//
func SimpleClientFactory(connFactory ConnFactory, limit uint32) ClientFactory {
return func() (c Client, err error) {
Expand All @@ -406,7 +400,7 @@ func SimpleClientFactory(connFactory ConnFactory, limit uint32) ClientFactory {
// create client
c = &client{
conn: newConn(conn),
ids: newIDs(limit),
ids: newIDs(),
}
return
}
Expand Down
104 changes: 63 additions & 41 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,77 +6,99 @@ import (
"time"
)

// requestId is supposed to be unique among all active requests in a connection. So a requestId
// should not be reused until the previous request of the same id is inactive (releasing the id).
func TestIDPool_Alloc(t *testing.T) {
t.Logf("default limit: %d", 65535)
ids := newIDs(0)
for i := uint32(1); i <= 65535; i++ {
ids := newIDs()
idToReserve := uint16(rand.Int31n(int32(MaxRequestID)))

// Loop over all ids to make sure it is sequencely returning
// 1 to 65535.
//
// Note: Use uint as loop counter so it can loop past 65535
// to end the loop (also keep the code readable)
for i := uint(1); i <= uint(MaxRequestID); i++ {
if want, have := uint16(i), ids.Alloc(); want != have {
t.Errorf("expected %d, got %d", want, have)
t.Fatalf("expected %v, got %v", want, have)
}
if i != uint(idToReserve) {
ids.Release(uint16(i))
}
}

// test if new id can be allocated
// when all ids are already allocated
newAlloc := make(chan uint16)
go func(ids idPool, newAlloc chan<- uint16) {
newAlloc <- ids.Alloc()
}(ids, newAlloc)

select {
case reqID := <-newAlloc:
t.Errorf("unexpected new allocation: %d", reqID)
case <-time.After(time.Millisecond * 100):
t.Log("blocks as expected")
// Loop over all requestids 5 times
for i := 0; i < 5; i++ {
for j := uint(1); j <= uint(MaxRequestID-1); j++ {
id := ids.Alloc()
if id == 0 {
t.Fatal("ids.Alloc() is never allowed to return 0")
} else if id == idToReserve {
t.Fatalf("The requestId %v was not reserved as expect", id)
} else if j < uint(idToReserve) {
if want, have := uint(id), j; want != have {
t.Fatalf("expected %v, got %v", want, have)
}
} else if j >= uint(idToReserve) {
if want, have := uint(id), j+1; want != have {
t.Fatalf("expected %v, got %v", want, have)
}
}
ids.Release(id) // always release the allocated id
}
}

// now, release a random ID
released := uint16(rand.Int31n(65535))
go func(ids idPool, released uint16) {
ids.Release(released)
}(ids, released)
// release the reserved id
ids.Release(idToReserve)

select {
case reqID := <-newAlloc:
if want, have := released, reqID; want != have {
t.Errorf("expected %d, got %d", want, have)
// make sure all ids are available again
for i := uint(1); i <= uint(MaxRequestID); i++ {
if want, have := uint16(i), ids.Alloc(); want != have {
t.Fatalf("expected %v, got %v", want, have)
}
case <-time.After(time.Millisecond * 100):
t.Errorf("unexpected blocking")
}
}

func TestIDPool_Alloc_withLimit(t *testing.T) {
// If all IDs are used up, pool is supposed to block on alloc after exhaustion.
func TestIDPool_block(t *testing.T) {

limit := uint32(rand.Int31n(100) + 10)
t.Logf("random limit: %d", limit)
ids := newIDs()

ids := newIDs(limit)
for i := uint32(1); i <= limit; i++ {
if want, have := uint16(i), ids.Alloc(); want != have {
t.Errorf("expected %d, got %d", want, have)
// Test allocating all ids once.
for i := uint(1); i <= uint(MaxRequestID); i++ {
id := ids.Alloc()
if want, have := i, uint(id); want != have {
t.Errorf("expected to allocate %v, got %v", want, have)
t.FailNow()
}
}

// test if new id can be allocated
// when all ids are already allocated
newAlloc := make(chan uint16)
go func(ids idPool, newAlloc chan<- uint16) {
waitAlloc := func(ids *idPool, newAlloc chan<- uint16) {
newAlloc <- ids.Alloc()
}(ids, newAlloc)
}
go waitAlloc(ids, newAlloc)
go waitAlloc(ids, newAlloc)
go waitAlloc(ids, newAlloc)
go waitAlloc(ids, newAlloc)
go waitAlloc(ids, newAlloc)

// wait some time to see if we can allocate id again
select {
case reqID := <-newAlloc:
t.Errorf("unexpected new allocation: %d", reqID)
t.Fatalf("unexpected new allocation: %d", reqID)
case <-time.After(time.Millisecond * 100):
t.Log("blocks as expected")
}

// now, release a random ID
released := uint16(rand.Int31n(int32(limit)))
go func(ids idPool, released uint16) {
released := uint16(rand.Int31n(int32(MaxRequestID)))
go func(ids *idPool, released uint16) {
// release an id
ids.Release(released)
t.Logf("id released: %v", released)
}(ids, released)

// wait some time to see if we can allocate id again
select {
case reqID := <-newAlloc:
if want, have := released, reqID; want != have {
Expand Down

0 comments on commit 4663d83

Please sign in to comment.