-
Notifications
You must be signed in to change notification settings - Fork 3
Refine event listener #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,34 +3,35 @@ package listener | |
| import ( | ||
| "context" | ||
| "math/big" | ||
| "sync" | ||
|
|
||
| "github.com/ethereum/go-ethereum" | ||
| "github.com/ethereum/go-ethereum/common" | ||
| "github.com/ethereum/go-ethereum/core/types" | ||
| "github.com/ethereum/go-ethereum/log" | ||
| ) | ||
|
|
||
| const ( | ||
| bufferedLogSize = 1000 | ||
| ) | ||
|
|
||
| var logger = log.New() | ||
|
|
||
| type EventListener struct { | ||
| client EthClient | ||
| logCh chan types.Log | ||
| client EthClient | ||
| logCh chan types.Log | ||
| eventCh chan *ContractEvent | ||
|
|
||
| // Contract address <-> Contract mapping | ||
| addressMap map[common.Address]*Contract | ||
| } | ||
|
|
||
| func NewEventListener(client EthClient, | ||
| contracts []*Contract) *EventListener { | ||
| contracts []*Contract, | ||
| bufferedLogSize int, | ||
| bufferedEventSize int) *EventListener { | ||
|
|
||
| l := &EventListener{ | ||
| client: client, | ||
| addressMap: make(map[common.Address]*Contract), | ||
| logCh: make(chan types.Log, bufferedLogSize), | ||
| eventCh: make(chan *ContractEvent, bufferedEventSize), | ||
| } | ||
|
|
||
| for _, c := range contracts { | ||
|
|
@@ -40,7 +41,8 @@ func NewEventListener(client EthClient, | |
| return l | ||
| } | ||
|
|
||
| func (el *EventListener) Listen(fromBlock *big.Int, eventCh chan<- *ContractEvent, stop <-chan struct{}) error { | ||
| func (el *EventListener) Listen(fromBlock *big.Int, stop <-chan struct{}) error { | ||
| wg := sync.WaitGroup{} | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
|
|
@@ -56,18 +58,22 @@ func (el *EventListener) Listen(fromBlock *big.Int, eventCh chan<- *ContractEven | |
| if err != nil { | ||
| return err | ||
| } | ||
| defer sub.Unsubscribe() | ||
|
|
||
| // fetch the past logs | ||
| logs, err := el.client.FilterLogs(context.Background(), q) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer el.channelCleanUp() | ||
| defer sub.Unsubscribe() | ||
|
|
||
| wg.Add(1) | ||
| defer wg.Wait() | ||
| go func() { | ||
| for _, l := range logs { | ||
| el.logCh <- l | ||
| } | ||
| wg.Done() | ||
| }() | ||
|
|
||
| for { | ||
|
|
@@ -76,7 +82,7 @@ func (el *EventListener) Listen(fromBlock *big.Int, eventCh chan<- *ContractEven | |
| return err | ||
| case log := <-el.logCh: | ||
| if cEvent := el.Parse(log); cEvent != nil { | ||
| eventCh <- cEvent | ||
| el.eventCh <- cEvent | ||
| } | ||
| case <-stop: | ||
| return nil | ||
|
|
@@ -106,3 +112,22 @@ func (el *EventListener) Parse(l types.Log) *ContractEvent { | |
| Removed: l.Removed, | ||
| } | ||
| } | ||
|
|
||
| func (el *EventListener) channelCleanUp() { | ||
| // Unsubscribe should be called before this cleanUp stage, therefore geth | ||
| // would stop sending logs through the log channel (but it won't close it). | ||
| // The goal of this function is to drain the log channel, send events through | ||
| // event channel and close it (to notify receiver that there's no more data). | ||
| for len(el.logCh) > 0 { | ||
| log := <-el.logCh | ||
| if cEvent := el.Parse(log); cEvent != nil { | ||
| el.eventCh <- cEvent | ||
| } | ||
| } | ||
| close(el.eventCh) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we don't wait until the events are processed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Receiver can still get data from channel even after sender closing it. The reason that sender HAS to close channel is to let receiver know there is nomore data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return | ||
| } | ||
|
|
||
| func (el *EventListener) GetEventCh() <-chan *ContractEvent { | ||
| return el.eventCh | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return eventCh <-chan *ContractEvent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But Listen() will be blocked until listening service terminated.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have the whole instance of EventListener returned after New(), and since both logCh and eventCh are included in EventListener struct, caller can access eventCh directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how could caller access
eventCh? It's a private variable.