Skip to content

Commit ac2a832

Browse files
authored
Merge pull request #7 from treasure-data/force-single-execution
Implement a global lock on requests
2 parents 671dc0e + 01034a7 commit ac2a832

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,12 @@ func main() {
4040
os.Exit(1)
4141
}
4242

43+
requestFree := make(chan bool, 1)
44+
requestFree <- true
45+
4346
pb := NewALBPayloadBuilder(*albMultiValue)
4447
client := MakeLambdaClient(*endpoint)
45-
handler := MakeInvokeLambdaHandler(client, *functionName, pb)
48+
handler := MakeInvokeLambdaHandler(client, *functionName, pb, requestFree)
4649

4750
http.HandleFunc("/", handler)
4851

@@ -63,8 +66,17 @@ func MakeLambdaClient(endpoint string) *lambda.Lambda {
6366
return lambda.New(sess, &config)
6467
}
6568

66-
func MakeInvokeLambdaHandler(client *lambda.Lambda, functionName string, pb PayloadBuilder) func(http.ResponseWriter, *http.Request) {
69+
func MakeInvokeLambdaHandler(client *lambda.Lambda, functionName string, pb PayloadBuilder, requestFree chan bool) func(http.ResponseWriter, *http.Request) {
6770
return func(w http.ResponseWriter, r *http.Request) {
71+
// Use the requestFree channel as a lock to prevent more than one inflight request to the lambda function
72+
// since it has a concurrency of one.
73+
_, ok := <-requestFree
74+
if !ok {
75+
return // Indicates channel closure
76+
}
77+
78+
defer func () {requestFree <- true}()
79+
6880
// Add proxy headers
6981
r.Header.Add("X-Forwarded-For", r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")])
7082
r.Header.Add("X-Forwarded-Proto", "http")

0 commit comments

Comments
 (0)