@@ -22,35 +22,26 @@ func newDialSync(worker dialWorkerFunc) *DialSync {
22
22
// DialSync is a dial synchronization helper that ensures that at most one dial
23
23
// to any given peer is active at any given time.
24
24
type DialSync struct {
25
+ mutex sync.Mutex
25
26
dials map [peer.ID ]* activeDial
26
- dialsLk sync.Mutex
27
27
dialWorker dialWorkerFunc
28
28
}
29
29
30
30
type activeDial struct {
31
- id peer.ID
32
31
refCnt int
33
32
34
33
ctx context.Context
35
34
cancel func ()
36
35
37
36
reqch chan dialRequest
38
-
39
- ds * DialSync
40
37
}
41
38
42
- func (ad * activeDial ) decref () {
43
- ad .ds .dialsLk .Lock ()
44
- ad .refCnt --
45
- if ad .refCnt == 0 {
46
- ad .cancel ()
47
- close (ad .reqch )
48
- delete (ad .ds .dials , ad .id )
49
- }
50
- ad .ds .dialsLk .Unlock ()
39
+ func (ad * activeDial ) close () {
40
+ ad .cancel ()
41
+ close (ad .reqch )
51
42
}
52
43
53
- func (ad * activeDial ) dial (ctx context.Context , p peer. ID ) (* Conn , error ) {
44
+ func (ad * activeDial ) dial (ctx context.Context ) (* Conn , error ) {
54
45
dialCtx := ad .ctx
55
46
56
47
if forceDirect , reason := network .GetForceDirectDial (ctx ); forceDirect {
@@ -76,30 +67,26 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) {
76
67
}
77
68
78
69
func (ds * DialSync ) getActiveDial (p peer.ID ) (* activeDial , error ) {
79
- ds .dialsLk .Lock ()
80
- defer ds .dialsLk .Unlock ()
70
+ ds .mutex .Lock ()
71
+ defer ds .mutex .Unlock ()
81
72
82
73
actd , ok := ds .dials [p ]
83
74
if ! ok {
84
75
// This code intentionally uses the background context. Otherwise, if the first call
85
76
// to Dial is canceled, subsequent dial calls will also be canceled.
86
77
// XXX: this also breaks direct connection logic. We will need to pipe the
87
78
// information through some other way.
88
- adctx , cancel := context .WithCancel (context .Background ())
79
+ ctx , cancel := context .WithCancel (context .Background ())
89
80
actd = & activeDial {
90
- id : p ,
91
- ctx : adctx ,
81
+ ctx : ctx ,
92
82
cancel : cancel ,
93
83
reqch : make (chan dialRequest ),
94
- ds : ds ,
95
84
}
96
85
go ds .dialWorker (p , actd .reqch )
97
86
ds .dials [p ] = actd
98
87
}
99
-
100
- // increase ref count before dropping dialsLk
88
+ // increase ref count before dropping mutex
101
89
actd .refCnt ++
102
-
103
90
return actd , nil
104
91
}
105
92
@@ -111,6 +98,14 @@ func (ds *DialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
111
98
return nil , err
112
99
}
113
100
114
- defer ad .decref ()
115
- return ad .dial (ctx , p )
101
+ defer func () {
102
+ ds .mutex .Lock ()
103
+ defer ds .mutex .Unlock ()
104
+ ad .refCnt --
105
+ if ad .refCnt == 0 {
106
+ ad .close ()
107
+ delete (ds .dials , p )
108
+ }
109
+ }()
110
+ return ad .dial (ctx )
116
111
}
0 commit comments