Skip to content

Commit

Permalink
Error recovery in RestfulServerListFetchDaemon
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan-Wilson committed Aug 12, 2022
1 parent 9918c01 commit b6687ed
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 15 deletions.
24 changes: 16 additions & 8 deletions das/rest_server_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package das
import (
"bufio"
"context"
"fmt"
"net/http"
"strings"
"time"

"github.com/ethereum/go-ethereum/log"
)

const initialMaxRecurseDepth uint16 = 8
Expand Down Expand Up @@ -58,6 +61,9 @@ func restfulServerURLsFromList(
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Recieved error response (%d) fetching online-url-list at %s", resp.StatusCode, listUrl)
}
scanner := bufio.NewScanner(resp.Body)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
Expand All @@ -84,28 +90,29 @@ const maxListFetchTime = time.Minute
func StartRestfulServerListFetchDaemon(ctx context.Context, listUrl string, updatePeriod time.Duration) <-chan []string {
updateChan := make(chan []string)

downloadAndSend := func() bool { // download and send once, return true iff success
downloadAndSend := func() error { // download and send once
subCtx, subCtxCancel := context.WithTimeout(ctx, maxListFetchTime)
defer subCtxCancel()

urls, err := RestfulServerURLsFromList(subCtx, listUrl)
if err != nil {
return false
return err
}
select {
case updateChan <- urls:
return true
return nil
case <-ctx.Done():
return false
return ctx.Err()
}
}

go func() {
defer close(updateChan)

// send the first result immediately
if !downloadAndSend() {
return
err := downloadAndSend()
if err != nil {
log.Warn("Couldn't download data availability online-url-list, will retry immediately", "err", err)
}

// now send periodically
Expand All @@ -116,8 +123,9 @@ func StartRestfulServerListFetchDaemon(ctx context.Context, listUrl string, upda
case <-ctx.Done():
return
case <-ticker.C:
if !downloadAndSend() {
return
err := downloadAndSend()
if err != nil {
log.Warn(fmt.Sprintf("Couldn't download data availability online-url-list, will retry in %s", updatePeriod), "err", err)
}
}
}
Expand Down
80 changes: 73 additions & 7 deletions das/restful_server_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRestfulServerList(t *testing.T) {

urlsIn := []string{"https://supersecret.nowhere.com:9871", "http://www.google.com"}
listContents := urlsIn[0] + " \t" + urlsIn[1]
port, server := newListHttpServerForTest(t, listContents)
port, server := newListHttpServerForTest(t, &stringHandler{listContents})

listUrl := fmt.Sprintf("http://localhost:%d", port)
urls, err := RestfulServerURLsFromList(ctx, listUrl)
Expand All @@ -41,7 +41,7 @@ func TestRestfulServerListDaemon(t *testing.T) {

urlsIn := []string{"https://supersecret.nowhere.com:9871", "http://www.google.com"}
listContents := urlsIn[0] + " \t" + urlsIn[1]
port, server := newListHttpServerForTest(t, listContents)
port, server := newListHttpServerForTest(t, &stringHandler{listContents})

listUrl := fmt.Sprintf("http://localhost:%d", port)

Expand All @@ -57,6 +57,43 @@ func TestRestfulServerListDaemon(t *testing.T) {
Require(t, err)
}

func TestRestfulServerListDaemonWithErrors(t *testing.T) {
initTest(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

urlsIn := []string{"https://supersecret.nowhere.com:9871", "http://www.google.com"}
listContents := urlsIn[0] + " \t" + urlsIn[1]
port, server := newListHttpServerForTest(
t,
Handlers(
&connectionClosingHandler{},
&connectionClosingHandler{},
&stringHandler{listContents},
&erroringHandler{},
&erroringHandler{},
&stringHandler{listContents},
&erroringHandler{},
&connectionClosingHandler{},
&stringHandler{listContents},
),
)

listUrl := fmt.Sprintf("http://localhost:%d", port)

listChan := StartRestfulServerListFetchDaemon(ctx, listUrl, 200*time.Millisecond)
for i := 0; i < 3; i++ {
list := <-listChan
if !stringListIsPermutation(list, urlsIn) {
t.Fatal(i, "not a match")
}
}

err := server.Shutdown(ctx)
Require(t, err)
}

func stringListIsPermutation(lis1, lis2 []string) bool {
if len(lis1) != len(lis2) {
return false
Expand All @@ -73,9 +110,9 @@ func stringListIsPermutation(lis1, lis2 []string) bool {
return true
}

func newListHttpServerForTest(t *testing.T, contents string) (int, *http.Server) {
func newListHttpServerForTest(t *testing.T, handler http.Handler) (int, *http.Server) {
server := &http.Server{
Handler: &testHandler{contents},
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
listener, err := net.Listen("tcp", "localhost:0")
Expand All @@ -87,10 +124,39 @@ func newListHttpServerForTest(t *testing.T, contents string) (int, *http.Server)
return tcpAddr.Port, server
}

type testHandler struct {
type stringHandler struct {
contents string
}

func (th *testHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
_, _ = w.Write([]byte(th.contents))
func (h *stringHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
_, _ = w.Write([]byte(h.contents))
}

type erroringHandler struct {
}

func (h *erroringHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(404)
}

type connectionClosingHandler struct {
}

func (h *connectionClosingHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
panic("close connection")
}

type multiHandler struct {
current int
handlers []http.Handler
}

func Handlers(hs ...http.Handler) *multiHandler {
return &multiHandler{0, hs}
}

func (h *multiHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
i := h.current % len(h.handlers)
h.current++
h.handlers[i].ServeHTTP(w, req)
}

0 comments on commit b6687ed

Please sign in to comment.