Skip to content

Commit eb30dec

Browse files
author
Akshay Shah
committed
Integrate timer wheel into TChannel package
1 parent 72a464d commit eb30dec

7 files changed

+80
-21
lines changed

all_channels_test.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,41 @@
1818
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
1919
// THE SOFTWARE.
2020

21-
package tchannel
21+
package tchannel_test
2222

2323
import (
2424
"testing"
25+
"time"
2526

2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
29+
30+
"github.com/uber/tchannel-go"
31+
"github.com/uber/tchannel-go/testutils"
2832
)
2933

3034
func TestAllChannelsRegistered(t *testing.T) {
31-
introspectOpts := &IntrospectionOptions{IncludeOtherChannels: true}
35+
introspectOpts := &tchannel.IntrospectionOptions{IncludeOtherChannels: true}
3236

33-
ch1_1, err := NewChannel("ch1", nil)
37+
ch1_1, err := tchannel.NewChannel("ch1", nil)
3438
require.NoError(t, err, "Channel create failed")
35-
ch1_2, err := NewChannel("ch1", nil)
39+
ch1_2, err := tchannel.NewChannel("ch1", nil)
3640
require.NoError(t, err, "Channel create failed")
37-
ch2_1, err := NewChannel("ch2", nil)
41+
ch2_1, err := tchannel.NewChannel("ch2", nil)
3842
require.NoError(t, err, "Channel create failed")
3943

4044
state := ch1_1.IntrospectState(introspectOpts)
4145
assert.Equal(t, 1, len(state.OtherChannels["ch1"]))
4246
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))
4347

4448
ch1_2.Close()
49+
time.Sleep(testutils.Timeout(10 * time.Millisecond))
4550

4651
state = ch1_1.IntrospectState(introspectOpts)
4752
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
4853
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))
4954

50-
ch2_2, err := NewChannel("ch2", nil)
55+
ch2_2, err := tchannel.NewChannel("ch2", nil)
5156

5257
state = ch1_1.IntrospectState(introspectOpts)
5358
require.NoError(t, err, "Channel create failed")
@@ -57,6 +62,7 @@ func TestAllChannelsRegistered(t *testing.T) {
5762
ch1_1.Close()
5863
ch2_1.Close()
5964
ch2_2.Close()
65+
time.Sleep(testutils.Timeout(10 * time.Millisecond))
6066

6167
state = ch1_1.IntrospectState(introspectOpts)
6268
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))

channel.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/uber/tchannel-go/timers"
3233
"github.com/uber/tchannel-go/tnet"
3334

3435
"github.com/opentracing/opentracing-go"
@@ -70,6 +71,10 @@ type ChannelOptions struct {
7071
// clamped to this value). Passing zero uses the default of 2m.
7172
RelayMaxTimeout time.Duration
7273

74+
// RelayTimeoutTick is the granularity of the timer wheel used to manage
75+
// timeouts.
76+
RelayTimeoutTick time.Duration
77+
7378
// The reporter to use for reporting stats for this channel.
7479
StatsReporter StatsReporter
7580

@@ -121,6 +126,8 @@ type Channel struct {
121126
peers *PeerList
122127
relayHost RelayHost
123128
relayMaxTimeout time.Duration
129+
relayTimeoutTick time.Duration
130+
relayTimeoutWheel *timers.Wheel
124131

125132
// mutable contains all the members of Channel which are mutable.
126133
mutable struct {
@@ -209,6 +216,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
209216
connectionOptions: opts.DefaultConnectionOptions,
210217
relayHost: opts.RelayHost,
211218
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
219+
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
212220
}
213221
ch.peers = newRootPeerList(ch).newChild()
214222

@@ -230,6 +238,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
230238

231239
if opts.RelayHost != nil {
232240
opts.RelayHost.SetChannel(ch)
241+
ch.startWheel()
233242
}
234243
return ch, nil
235244
}
@@ -665,13 +674,16 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
665674
chState, minState)
666675

667676
if updatedToState == ChannelClosed {
668-
ch.onClosed()
677+
go ch.onClosed()
669678
}
670679
}
671680

672681
func (ch *Channel) onClosed() {
673682
removeClosedChannel(ch)
674-
ch.log.Infof("Channel closed.")
683+
if ch.relayTimeoutWheel != nil {
684+
ch.relayTimeoutWheel.Stop()
685+
}
686+
ch.log.Info("Channel closed.")
675687
}
676688

677689
// Closed returns whether this channel has been closed with .Close()
@@ -717,7 +729,7 @@ func (ch *Channel) Close() {
717729
}
718730

719731
if channelClosed {
720-
ch.onClosed()
732+
go ch.onClosed()
721733
}
722734
}
723735

@@ -726,6 +738,10 @@ func (ch *Channel) RelayHost() RelayHost {
726738
return ch.relayHost
727739
}
728740

741+
func (ch *Channel) startWheel() {
742+
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
743+
}
744+
729745
func toStringSet(ss []string) map[string]struct{} {
730746
set := make(map[string]struct{}, len(ss))
731747
for _, s := range ss {

connection_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ func TestTimeout(t *testing.T) {
398398
testHandler := onErrorTestHandler{newTestHandler(t), onError}
399399
ts.Register(raw.Wrap(testHandler), "block")
400400

401-
ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond))
401+
ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond))
402402
defer cancel()
403403

404404
_, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "block", []byte("Arg2"), []byte("Arg3"))
@@ -662,7 +662,7 @@ func TestReadTimeout(t *testing.T) {
662662
func TestWriteTimeout(t *testing.T) {
663663
testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) {
664664
ch := ts.Server()
665-
ctx, cancel := NewContext(testutils.Timeout(15 * time.Millisecond))
665+
ctx, cancel := NewContext(testutils.Timeout(25 * time.Millisecond))
666666
defer cancel()
667667

668668
call, err := ch.BeginCall(ctx, ts.HostPort(), ch.ServiceName(), "call", nil)

introspection.go

+2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ type RelayerRuntimeState struct {
156156
InboundItems RelayItemSetState `json:"inboundItems"`
157157
OutboundItems RelayItemSetState `json:"outboundItems"`
158158
MaxTimeout time.Duration `json:"maxTimeout"`
159+
TimeoutTick time.Duration `json:"timeoutTick"`
159160
}
160161

161162
// ExchangeSetRuntimeState is the runtime state for a message exchange set.
@@ -360,6 +361,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
360361
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
361362
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
362363
MaxTimeout: r.maxTimeout,
364+
TimeoutTick: r.timeoutTick,
363365
}
364366
}
365367

relay.go

+33-10
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"github.com/uber/tchannel-go/relay"
31+
"github.com/uber/tchannel-go/timers"
3132

3233
"github.com/uber-go/atomic"
3334
)
@@ -40,6 +41,9 @@ const (
4041
_relayTombTTL = 3 * time.Second
4142
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
4243
_defaultRelayMaxTimeout = 2 * time.Minute
44+
// _defaultRelayTimeoutTick is the default tick duration for processing
45+
// relay timeouts.
46+
_defaultRelayTimeoutTick = 5 * time.Millisecond
4347
)
4448

4549
var (
@@ -53,7 +57,7 @@ var (
5357
type relayConn Connection
5458

5559
type relayItem struct {
56-
*time.Timer
60+
*timers.Timer
5761

5862
remapID uint32
5963
tomb bool
@@ -67,14 +71,16 @@ type relayItems struct {
6771
sync.RWMutex
6872

6973
logger Logger
74+
wheel *timers.Wheel
7075
tombs uint64
7176
items map[uint32]relayItem
7277
}
7378

74-
func newRelayItems(logger Logger) *relayItems {
79+
func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems {
7580
return &relayItems{
7681
items: make(map[uint32]relayItem),
7782
logger: logger,
83+
wheel: wheel,
7884
}
7985
}
8086

@@ -150,9 +156,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
150156
r.items[id] = item
151157
r.Unlock()
152158

153-
// TODO: We should be clearing these out in batches, rather than creating
154-
// individual timers for each item.
155-
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
159+
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
156160
return item, true
157161
}
158162

@@ -165,8 +169,10 @@ const (
165169

166170
// A Relayer forwards frames.
167171
type Relayer struct {
168-
relayHost RelayHost
169-
maxTimeout time.Duration
172+
relayHost RelayHost
173+
maxTimeout time.Duration
174+
timeoutTick time.Duration
175+
wheel *timers.Wheel
170176

171177
// localHandlers is the set of service names that are handled by the local
172178
// channel.
@@ -190,12 +196,15 @@ type Relayer struct {
190196

191197
// NewRelayer constructs a Relayer.
192198
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
199+
wheel := ch.relayTimeoutWheel
193200
return &Relayer{
194201
relayHost: ch.RelayHost(),
195202
maxTimeout: ch.relayMaxTimeout,
203+
timeoutTick: ch.relayTimeoutTick,
204+
wheel: wheel,
196205
localHandler: ch.relayLocal,
197-
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
198-
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
206+
outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})),
207+
inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})),
199208
peers: ch.rootPeers(),
200209
conn: conn,
201210
logger: conn.log,
@@ -455,7 +464,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
455464
if isOriginator {
456465
items = r.outbound
457466
}
458-
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
467+
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
459468
items.Add(id, item)
460469
return item
461470
}
@@ -600,3 +609,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
600609
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
601610
return _defaultRelayMaxTimeout
602611
}
612+
613+
func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
614+
if d > 0 {
615+
return d
616+
}
617+
if d == 0 {
618+
return _defaultRelayTimeoutTick
619+
}
620+
logger.WithFields(
621+
LogField{"configuredTimeoutTick", d},
622+
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
623+
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
624+
return _defaultRelayTimeoutTick
625+
}

relay_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,12 @@ func TestRelayRejectsDuringClose(t *testing.T) {
565565
require.Error(t, err, "Expect call to fail after relay is shutdown")
566566
assert.Contains(t, err.Error(), "incoming connection is not active")
567567
close(block)
568+
wg.Wait()
569+
// FIXME:
570+
// We fire all pending timers when we shut down the relay, which
571+
// includes the timeout for the blocked handler. That unblocks the
572+
// handler before we get to the call above, which obviates the point of
573+
// this test.
568574

569575
// We have a successful call that ran in the goroutine
570576
// and a failed call that we just checked the error on.

testutils/channel_opts.go

+6
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
199199
return o
200200
}
201201

202+
// SetRelayTimeoutTick sets the coarseness of relay timeouts.
203+
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
204+
o.ChannelOptions.RelayTimeoutTick = d
205+
return o
206+
}
207+
202208
func defaultString(v string, defaultValue string) string {
203209
if v == "" {
204210
return defaultValue

0 commit comments

Comments
 (0)