Skip to content

Commit 13bc016

Browse files
author
Akshay Shah
committed
First cut at TChannel integration
1 parent b0844c0 commit 13bc016

File tree

4 files changed

+59
-8
lines changed

4 files changed

+59
-8
lines changed

channel.go

+18
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/uber/tchannel-go/timers"
3233
"github.com/uber/tchannel-go/tnet"
3334

3435
"github.com/opentracing/opentracing-go"
@@ -69,6 +70,10 @@ type ChannelOptions struct {
6970
// clamped to this value). Passing zero uses the default of 2m.
7071
RelayMaxTimeout time.Duration
7172

73+
// RelayTimeoutTick is the granularity of the timer wheel used to manage
74+
// timeouts.
75+
RelayTimeoutTick time.Duration
76+
7277
// The reporter to use for reporting stats for this channel.
7378
StatsReporter StatsReporter
7479

@@ -119,6 +124,8 @@ type Channel struct {
119124
peers *PeerList
120125
relayHost RelayHost
121126
relayMaxTimeout time.Duration
127+
relayTimeoutTick time.Duration
128+
relayTimeoutWheel *timers.Wheel
122129

123130
// mutable contains all the members of Channel which are mutable.
124131
mutable struct {
@@ -200,6 +207,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
200207
connectionOptions: opts.DefaultConnectionOptions,
201208
relayHost: opts.RelayHost,
202209
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
210+
relayTimeoutTick: validateRelayTimeoutTick(opts.RelayTimeoutTick, logger),
203211
}
204212
ch.peers = newRootPeerList(ch).newChild()
205213

@@ -221,6 +229,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
221229

222230
if opts.RelayHost != nil {
223231
opts.RelayHost.SetChannel(ch)
232+
ch.startWheel()
224233
}
225234
return ch, nil
226235
}
@@ -693,6 +702,11 @@ func (ch *Channel) Close() {
693702
for _, c := range connections {
694703
c.Close()
695704
}
705+
706+
if ch.relayTimeoutWheel != nil {
707+
ch.relayTimeoutWheel.Stop()
708+
}
709+
696710
removeClosedChannel(ch)
697711
}
698712

@@ -701,6 +715,10 @@ func (ch *Channel) RelayHost() RelayHost {
701715
return ch.relayHost
702716
}
703717

718+
func (ch *Channel) startWheel() {
719+
ch.relayTimeoutWheel = timers.NewWheel(ch.relayTimeoutTick, ch.relayMaxTimeout)
720+
}
721+
704722
func toStringSet(ss []string) map[string]struct{} {
705723
set := make(map[string]struct{}, len(ss))
706724
for _, s := range ss {

introspection.go

+2
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type RelayerRuntimeState struct {
153153
InboundItems RelayItemSetState `json:"inboundItems"`
154154
OutboundItems RelayItemSetState `json:"outboundItems"`
155155
MaxTimeout time.Duration `json:"maxTimeout"`
156+
TimeoutTick time.Duration `json:"timeoutTick"`
156157
}
157158

158159
// ExchangeSetRuntimeState is the runtime state for a message exchange set.
@@ -355,6 +356,7 @@ func (r *Relayer) IntrospectState(opts *IntrospectionOptions) RelayerRuntimeStat
355356
InboundItems: r.inbound.IntrospectState(opts, "inbound"),
356357
OutboundItems: r.outbound.IntrospectState(opts, "outbound"),
357358
MaxTimeout: r.maxTimeout,
359+
TimeoutTick: r.timeoutTick,
358360
}
359361
}
360362

relay.go

+33-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"github.com/uber/tchannel-go/relay"
31+
"github.com/uber/tchannel-go/timers"
3132

3233
"github.com/uber-go/atomic"
3334
)
@@ -40,6 +41,9 @@ const (
4041
_relayTombTTL = 3 * time.Second
4142
// _defaultRelayMaxTimeout is the default max TTL for relayed calls.
4243
_defaultRelayMaxTimeout = 2 * time.Minute
44+
// _defaultRelayTimeoutTick is the default tick duration for processing
45+
// relay timeouts.
46+
_defaultRelayTimeoutTick = 5 * time.Millisecond
4347
)
4448

4549
var (
@@ -53,7 +57,7 @@ var (
5357
type relayConn Connection
5458

5559
type relayItem struct {
56-
*time.Timer
60+
*timers.Timer
5761

5862
remapID uint32
5963
tomb bool
@@ -67,14 +71,16 @@ type relayItems struct {
6771
sync.RWMutex
6872

6973
logger Logger
74+
wheel *timers.Wheel
7075
tombs uint64
7176
items map[uint32]relayItem
7277
}
7378

74-
func newRelayItems(logger Logger) *relayItems {
79+
func newRelayItems(wheel *timers.Wheel, logger Logger) *relayItems {
7580
return &relayItems{
7681
items: make(map[uint32]relayItem),
7782
logger: logger,
83+
wheel: wheel,
7884
}
7985
}
8086

@@ -152,7 +158,7 @@ func (r *relayItems) Entomb(id uint32, deleteAfter time.Duration) (relayItem, bo
152158

153159
// TODO: We should be clearing these out in batches, rather than creating
154160
// individual timers for each item.
155-
time.AfterFunc(deleteAfter, func() { r.Delete(id) })
161+
r.wheel.AfterFunc(deleteAfter, func() { r.Delete(id) })
156162
return item, true
157163
}
158164

@@ -165,8 +171,10 @@ const (
165171

166172
// A Relayer forwards frames.
167173
type Relayer struct {
168-
relayHost RelayHost
169-
maxTimeout time.Duration
174+
relayHost RelayHost
175+
maxTimeout time.Duration
176+
timeoutTick time.Duration
177+
wheel *timers.Wheel
170178

171179
// localHandlers is the set of service names that are handled by the local
172180
// channel.
@@ -190,12 +198,15 @@ type Relayer struct {
190198

191199
// NewRelayer constructs a Relayer.
192200
func NewRelayer(ch *Channel, conn *Connection) *Relayer {
201+
wheel := ch.relayTimeoutWheel
193202
return &Relayer{
194203
relayHost: ch.RelayHost(),
195204
maxTimeout: ch.relayMaxTimeout,
205+
timeoutTick: ch.relayTimeoutTick,
206+
wheel: wheel,
196207
localHandler: ch.relayLocal,
197-
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
198-
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
208+
outbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "outbound"})),
209+
inbound: newRelayItems(wheel, ch.Logger().WithFields(LogField{"relay", "inbound"})),
199210
peers: ch.rootPeers(),
200211
conn: conn,
201212
logger: conn.log,
@@ -455,7 +466,7 @@ func (r *Relayer) addRelayItem(isOriginator bool, id, remapID uint32, destinatio
455466
if isOriginator {
456467
items = r.outbound
457468
}
458-
item.Timer = time.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
469+
item.Timer = r.wheel.AfterFunc(ttl, func() { r.timeoutRelayItem(isOriginator, items, id) })
459470
items.Add(id, item)
460471
return item
461472
}
@@ -600,3 +611,17 @@ func validateRelayMaxTimeout(d time.Duration, logger Logger) time.Duration {
600611
).Warn("Configured RelayMaxTimeout is invalid, using default instead.")
601612
return _defaultRelayMaxTimeout
602613
}
614+
615+
func validateRelayTimeoutTick(d time.Duration, logger Logger) time.Duration {
616+
if d > 0 {
617+
return d
618+
}
619+
if d == 0 {
620+
return _defaultRelayTimeoutTick
621+
}
622+
logger.WithFields(
623+
LogField{"configuredTimeoutTick", d},
624+
LogField{"defaultTimeoutTick", _defaultRelayTimeoutTick},
625+
).Warn("Configured RelayTimeoutTick is invalid, using default instead.")
626+
return _defaultRelayTimeoutTick
627+
}

testutils/channel_opts.go

+6
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ func (o *ChannelOpts) SetRelayMaxTimeout(d time.Duration) *ChannelOpts {
199199
return o
200200
}
201201

202+
// SetRelayTimeoutTick sets the coarseness of relay timeouts.
203+
func (o *ChannelOpts) SetRelayTimeoutTick(d time.Duration) *ChannelOpts {
204+
o.ChannelOptions.RelayTimeoutTick = d
205+
return o
206+
}
207+
202208
func defaultString(v string, defaultValue string) string {
203209
if v == "" {
204210
return defaultValue

0 commit comments

Comments
 (0)