From b0306830ac4fef223f179b70eea8fd3245b89926 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Fri, 1 Jul 2016 17:56:31 +0800 Subject: [PATCH 01/17] add a file to load user's utilization of capacity --- usage.go | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 usage.go diff --git a/usage.go b/usage.go new file mode 100644 index 00000000..3d446e93 --- /dev/null +++ b/usage.go @@ -0,0 +1,113 @@ +package main + +import ( + "os" + "strings" + "github.com/cyfdecyf/bufio" + "errors" + "time" + "bytes" + "fmt" +) + +var userUsage struct { + usage map[string]int + capacity map[string]int + lastSavedts time.Time +} + +func parseCapacity(line string) (user string, capacity int, err error) { + arr := strings.Split(line, ":") + n := len(arr) + if n != 2 { + err = errors.New("User capacity limitation: " + line + + " syntax wrong, should be username:capacity") + return + } + u, c := arr[0], uint32(arr[1]) + return u, c, nil +} + +func parseUsage(line string) (user string, usage int, err error) { + arr := strings.Split(line, ":") + n := len(arr) + if n != 2 { + err = errors.New("Record file format error: " + line + + " syntax wrong, should be username:usage") + return + } + u, c := arr[0], uint32(arr[1]) + return u, c, nil +} + +func loadCapcity(file string) { + // load capcity first + if file == "" { + return + } + f, err := os.Open(file) + if err != nil { + Fatal("error opening user usage file:", err) + } + + r := bufio.NewReader(f) + s := bufio.NewScanner(r) + for s.Scan() { + line := s.Text() + if line == "" { + continue + } + u, c, err := parseCapacity(s.Text()) + if err != nil { + Fatal(err) + } + if _, ok := userUsage.capacity[u]; ok { + Fatal("duplicate user:", u) + } + userUsage.capacity[u] = c + userUsage.usage[u] = 0 + + } + f.Close() +} + +func loadUsage() { + dir, err := os.Getwd() + if err != nil { + Fatal("error opening current directory:", err) + } + buf := new(bytes.Buffer) + fmt.Fprint(buf, dir, "/_records.log") + f, err := os.OpenFile(buf.String(), os.O_CREATE, 0666) + if err != nil { + Fatal("error opening/creating user record file:", err) + } + r := bufio.NewReader(f) + s := bufio.NewScanner(r) + for s.Scan() { + line := s.Text() + if line == "" { + continue + } + u, c, err := parseUsage(s.Text()) + if err != nil { + Fatal(err) + } + if _, ok := userUsage.usage[u]; ok { + Fatal("duplicate record:", line) + } + userUsage.usage[u] = c + + } + f.Close() +} + +func loadUserUsageFile(file string) { + //load capacity at first + loadCapcity(file) + + // load usage + loadUsage() +} + + From c2ed36dc508fc27b64329137ce27a81ebe5082ff Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 6 Jul 2016 15:35:28 +0800 Subject: [PATCH 02/17] finish essential function of the recording of user usage --- config.go | 11 ++++++ main.go | 7 ++++ usage.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 107 insertions(+), 15 deletions(-) diff --git a/config.go b/config.go index c3bf294e..2e459046 100644 --- a/config.go +++ b/config.go @@ -76,6 +76,9 @@ type Config struct { // not config option saveReqLine bool // for http and cow parent, should save request line from client + + // capacity limitation file + UserCapacityFile string } var config Config @@ -559,6 +562,14 @@ func (p configParser) ParseUserPasswdFile(val string) { config.UserPasswdFile = val } +func (p configParser) ParseUserCapacityFile(val string) { + err := isFileExists(val) + if err != nil { + Fatal("userCapacityFile:", err) + } + config.UserCapacityFile = val +} + func (p configParser) ParseAllowedClient(val string) { config.AllowedClient = val } diff --git a/main.go b/main.go index 422ecbef..d651f3fa 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,8 @@ func main() { initStat() + usageFlag := initUsage() + initParentPool() /* @@ -77,6 +79,11 @@ func main() { go proxy.Serve(&wg, quit) } + //add the usage recorder + if usageFlag { + wg.Add(1) + go startUsageRecorder(&wg, quit) + } wg.Wait() if relaunch { diff --git a/usage.go b/usage.go index 3d446e93..1437bf1a 100644 --- a/usage.go +++ b/usage.go @@ -3,13 +3,17 @@ package main import ( "os" "strings" + "strconv" "github.com/cyfdecyf/bufio" "errors" "time" "bytes" "fmt" + "sync" ) +var recordPath string + var userUsage struct { usage map[string]int capacity map[string]int @@ -22,10 +26,16 @@ func parseCapacity(line string) (user string, capacity int, err error) { if n != 2 { err = errors.New("User capacity limitation: " + line + " syntax wrong, should be username:capacity") - return + return "", 0, err } - u, c := arr[0], uint32(arr[1]) - return u, c, nil + c, err := strconv.Atoi(arr[1]) + if err != nil { + err = errors.New("Record file format error: " + arr[1] + + " syntax wrong, should be int") + return "", 0, err + } + debug.Printf("user: %s, capacity: %d", arr[0], c) + return arr[0], c, nil } func parseUsage(line string) (user string, usage int, err error) { @@ -34,10 +44,16 @@ func parseUsage(line string) (user string, usage int, err error) { if n != 2 { err = errors.New("Record file format error: " + line + " syntax wrong, should be username:usage") - return + return "", 0, err } - u, c := arr[0], uint32(arr[1]) - return u, c, nil + c, err := strconv.Atoi(arr[1]) + if err != nil { + err = errors.New("Record file format error: " + arr[1] + + " syntax wrong, should be int") + return "", 0, err + } + debug.Printf("user: %s, usage: %d", arr[0], c) + return arr[0], c, nil } func loadCapcity(file string) { @@ -72,13 +88,7 @@ func loadCapcity(file string) { } func loadUsage() { - dir, err := os.Getwd() - if err != nil { - Fatal("error opening current directory:", err) - } - buf := new(bytes.Buffer) - fmt.Fprint(buf, dir, "/_records.log") - f, err := os.OpenFile(buf.String(), os.O_CREATE, 0666) + f, err := os.OpenFile(recordPath, os.O_CREATE, 0600) if err != nil { Fatal("error opening/creating user record file:", err) } @@ -102,12 +112,76 @@ func loadUsage() { f.Close() } -func loadUserUsageFile(file string) { +func flushLog() { + bakPath := recordPath + ".bak" + f, err := os.OpenFile(bakPath, os.O_WRONLY | os.O_CREATE, 0600) + if err != nil { + Fatal("error opening/creating user record file:", err) + } + w := bufio.NewWriter(f) + for k, v := range userUsage.usage { + r := fmt.Sprintf("%s:%d\n", k, v) + w.WriteString(r) + } + w.Flush() + f.Close() + + os.Remove(recordPath) + os.Rename(bakPath, recordPath) + + +} + +func startUsageRecorder(wg *sync.WaitGroup, quit <-chan struct{}) { + defer func() { + flushLog() + debug.Println("exit the usage recorder") + wg.Done() + }() + var exit bool + go func() { + <-quit + exit=true + }() + + debug.Println("start usage recording!") + interval := 0 + for { + time.Sleep(1000 * time.Millisecond) + interval += 1 + if exit { + break + } + if interval > 7200 { + flushLog() + interval = 0 + } + } +} + +func initUsage() bool{ + if config.UserPasswdFile == "" || + config.UserCapacityFile == "" { + return false + } + // get current running path + dir, err := os.Getwd() + if err != nil { + Fatal("error opening current directory:", err) + } + buf := new(bytes.Buffer) + fmt.Fprint(buf, dir, "/_records.log") + recordPath = buf.String() + + userUsage.capacity = make(map[string]int) + userUsage.usage = make(map[string]int) + userUsage.lastSavedts = time.Now() //load capacity at first - loadCapcity(file) + loadCapcity(config.UserCapacityFile) // load usage loadUsage() + return true } From d0f5b99522be44246bd93c88dfbedfedf1980e7c Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 6 Jul 2016 21:23:42 +0800 Subject: [PATCH 03/17] check usage when a connection is established --- usage.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/usage.go b/usage.go index 1437bf1a..672e1c7f 100644 --- a/usage.go +++ b/usage.go @@ -184,4 +184,44 @@ func initUsage() bool{ return true } +func checkUsage(r *Request) bool { + arr := strings.SplitN(r.ProxyAuthorization, " ", 2) + if len(arr) != 2 { + err := errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization) + Fatal(err) + } + userPasswd := strings.Split(arr[1], ":") + if len(userPasswd) != 2 { + err := errors.New("auth: malformed basic auth user:passwd") + Fatal(err) + } + user := arr[0] + var capacity int + var usage int + if val, ok := userUsage.capacity[user]; ok { + capacity = val + } + // don't have to check here + usage = userUsage.usage[user] + return (usage > capacity) +} + +func accumulateUsage(r *Request, rp *Response) { + arr := strings.SplitN(r.ProxyAuthorization, " ", 2) + if len(arr) != 2 { + err := errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization) + Fatal(err) + } + userPasswd := strings.Split(arr[1], ":") + if len(userPasswd) != 2 { + return errors.New("auth: malformed basic auth user:passwd") + } + user := arr[0] + if _, ok := userUsage.usage[user]; ok { + userUsage.usage[user] += len(rp.rawByte) / 1024 / 1024 + } + + +} + From 3a53a4857c0075a05f1da9e0244f5d5a8dd00dde Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 6 Jul 2016 21:25:43 +0800 Subject: [PATCH 04/17] update usage when a request is successfully reponsed --- proxy.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/proxy.go b/proxy.go index 601ac65e..d8943dd0 100644 --- a/proxy.go +++ b/proxy.go @@ -501,6 +501,14 @@ func (c *clientConn) serve() { return } + if config.UserCapacityFile != "" { + if checkUsage(&r) != true { + sendErrorPage(c, statusForbidden, "Run out of capacity", + genErrMsg(&r, nil, "Please contact proxy admin.")) + return + } + } + if r.ExpectContinue { sendErrorPage(c, statusExpectFailed, "Expect header not supported", "Please contact COW's developer if you see this.") @@ -1285,6 +1293,8 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er r.state = rsSent if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() + // response received successfully + accumulateUsage(&r, &rp) } return err } From 53b2d249c7a72b7f2400792f07d91628a91a6715 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 6 Jul 2016 21:30:00 +0800 Subject: [PATCH 05/17] fix compile bug --- proxy.go | 2 +- usage.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/proxy.go b/proxy.go index d8943dd0..f35e2a10 100644 --- a/proxy.go +++ b/proxy.go @@ -1294,7 +1294,7 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() // response received successfully - accumulateUsage(&r, &rp) + accumulateUsage(r, rp) } return err } diff --git a/usage.go b/usage.go index 672e1c7f..14dcae20 100644 --- a/usage.go +++ b/usage.go @@ -214,7 +214,8 @@ func accumulateUsage(r *Request, rp *Response) { } userPasswd := strings.Split(arr[1], ":") if len(userPasswd) != 2 { - return errors.New("auth: malformed basic auth user:passwd") + err := errors.New("auth: malformed basic auth user:passwd") + Fatal(err) } user := arr[0] if _, ok := userUsage.usage[user]; ok { From 307ea254ca307bfc7c32a7f748f15e479fe2b384 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Thu, 7 Jul 2016 09:13:46 +0800 Subject: [PATCH 06/17] remove redundant check --- usage.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/usage.go b/usage.go index 14dcae20..e0227bd9 100644 --- a/usage.go +++ b/usage.go @@ -103,9 +103,6 @@ func loadUsage() { if err != nil { Fatal(err) } - if _, ok := userUsage.usage[u]; ok { - Fatal("duplicate record:", line) - } userUsage.usage[u] = c } @@ -220,6 +217,7 @@ func accumulateUsage(r *Request, rp *Response) { user := arr[0] if _, ok := userUsage.usage[user]; ok { userUsage.usage[user] += len(rp.rawByte) / 1024 / 1024 + debug.Printf("user: %s add %d MB, total %d", user, len(rp.rawByte) / 1024 / 1024, userUsage.usage[user]) } From acea6ecbaa9740b0eaafa1a3929ce966af76cbdc Mon Sep 17 00:00:00 2001 From: frankfzw Date: Fri, 8 Jul 2016 10:05:21 +0800 Subject: [PATCH 07/17] finish usage monitor demo --- auth.go | 71 +++++++++++++++++++++++++-------------------- config.go | 5 ++++ proxy.go | 4 +-- usage.go | 86 +++++++++++++++++++++++++++++++++++++------------------ 4 files changed, 105 insertions(+), 61 deletions(-) diff --git a/auth.go b/auth.go index 270e00a8..76df0339 100644 --- a/auth.go +++ b/auth.go @@ -47,7 +47,10 @@ var auth struct { allowedClient []netAddr - authed *TimeoutSet // cache authenticated users based on ip + // cache authenticated users based on ip and port + // add port to identify the users behind one ip address + // this may cause a user auth more than once + authed *TimeoutSet template *template.Template } @@ -181,16 +184,19 @@ func initAuth() { // authentication is needed, and should be passed back on subsequent call. func Authenticate(conn *clientConn, r *Request) (err error) { clientIP, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - if auth.authed.has(clientIP) { - debug.Printf("%s has already authed\n", clientIP) + if auth.authed.has(conn.RemoteAddr().String()) { + debug.Printf("%s has already authed\n", conn.RemoteAddr().String()) return } if authIP(clientIP) { // IP is allowed return } - err = authUserPasswd(conn, r) - if err == nil { - auth.authed.add(clientIP) + err, user := authUserPasswd(conn, r) + if err == nil && user != ""{ + auth.authed.add(conn.RemoteAddr().String()) + // update the map of address to userid in usage + updateAddrToUser(conn.RemoteAddr().String(), user) + } return } @@ -231,14 +237,14 @@ func calcRequestDigest(kv map[string]string, ha1, method string) string { return md5sum(strings.Join(arr, ":")) } -func checkProxyAuthorization(conn *clientConn, r *Request) error { +func checkProxyAuthorization(conn *clientConn, r *Request) (error, string) { if debug { debug.Printf("cli(%s) authorization: %s\n", conn.RemoteAddr(), r.ProxyAuthorization) } arr := strings.SplitN(r.ProxyAuthorization, " ", 2) if len(arr) != 2 { - return errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization) + return errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization), "" } authMethod := strings.ToLower(strings.TrimSpace(arr[0])) if authMethod == "digest" { @@ -246,7 +252,7 @@ func checkProxyAuthorization(conn *clientConn, r *Request) error { } else if authMethod == "basic" { return authBasic(conn, arr[1]) } - return errors.New("auth: method " + arr[0] + " unsupported, must use digest") + return errors.New("auth: method " + arr[0] + " unsupported, must use digest"), "" } func authPort(conn *clientConn, user string, au *authUser) error { @@ -262,73 +268,76 @@ func authPort(conn *clientConn, user string, au *authUser) error { return nil } -func authBasic(conn *clientConn, userPasswd string) error { +func authBasic(conn *clientConn, userPasswd string) (error, string) { b64, err := base64.StdEncoding.DecodeString(userPasswd) if err != nil { - return errors.New("auth:" + err.Error()) + return errors.New("auth:" + err.Error()), "" } arr := strings.Split(string(b64), ":") if len(arr) != 2 { - return errors.New("auth: malformed basic auth user:passwd") + return errors.New("auth: malformed basic auth user:passwd"), "" } user := arr[0] passwd := arr[1] au, ok := auth.user[user] if !ok || au.passwd != passwd { - return errAuthRequired + return errAuthRequired, user } - return authPort(conn, user, au) + if ret := authPort(conn, user, au); ret != nil { + return ret, user + } + return nil, user } -func authDigest(conn *clientConn, r *Request, keyVal string) error { +func authDigest(conn *clientConn, r *Request, keyVal string) (error, string) { authHeader := parseKeyValueList(keyVal) if len(authHeader) == 0 { - return errors.New("auth: empty authorization list") + return errors.New("auth: empty authorization list"), "" } nonceTime, err := strconv.ParseInt(authHeader["nonce"], 16, 64) if err != nil { - return fmt.Errorf("auth: nonce %v", err) + return fmt.Errorf("auth: nonce %v", err), "" } // If nonce time too early, reject. iOS will create a new connection to do // authentication. if time.Now().Sub(time.Unix(nonceTime, 0)) > time.Minute { - return errAuthRequired + return errAuthRequired, "" } user := authHeader["username"] au, ok := auth.user[user] if !ok { errl.Printf("cli(%s) auth: no such user: %s\n", conn.RemoteAddr(), authHeader["username"]) - return errAuthRequired + return errAuthRequired, "user" } if err = authPort(conn, user, au); err != nil { - return err + return err, user } if authHeader["qop"] != "auth" { - return errors.New("auth: qop wrong: " + authHeader["qop"]) + return errors.New("auth: qop wrong: " + authHeader["qop"]), user } response, ok := authHeader["response"] if !ok { - return errors.New("auth: no request-digest response") + return errors.New("auth: no request-digest response"), user } au.initHA1(user) digest := calcRequestDigest(authHeader, au.ha1, r.Method) if response != digest { errl.Printf("cli(%s) auth: digest not match, maybe password wrong", conn.RemoteAddr()) - return errAuthRequired + return errAuthRequired, user } - return nil + return nil, user } -func authUserPasswd(conn *clientConn, r *Request) (err error) { +func authUserPasswd(conn *clientConn, r *Request) (err error, user string) { if r.ProxyAuthorization != "" { // client has sent authorization header - err = checkProxyAuthorization(conn, r) - if err == nil { - return + err, user = checkProxyAuthorization(conn, r) + if err == nil && user != ""{ + return nil, user } else if err != errAuthRequired { sendErrorPage(conn, statusBadReq, "Bad authorization request", err.Error()) return @@ -344,13 +353,13 @@ func authUserPasswd(conn *clientConn, r *Request) (err error) { } buf := new(bytes.Buffer) if err := auth.template.Execute(buf, data); err != nil { - return fmt.Errorf("error generating auth response: %v", err) + return fmt.Errorf("error generating auth response: %v", err), "" } if bool(debug) && verbose { debug.Printf("authorization response:\n%s", buf.String()) } if _, err := conn.Write(buf.Bytes()); err != nil { - return fmt.Errorf("send auth response error: %v", err) + return fmt.Errorf("send auth response error: %v", err), "" } - return errAuthRequired + return errAuthRequired, "" } diff --git a/config.go b/config.go index 2e459046..694febd2 100644 --- a/config.go +++ b/config.go @@ -79,6 +79,7 @@ type Config struct { // capacity limitation file UserCapacityFile string + UsageResetDate int } var config Config @@ -570,6 +571,10 @@ func (p configParser) ParseUserCapacityFile(val string) { config.UserCapacityFile = val } +func (p configParser) ParseUsageResetDate(val string) { + config.UsageResetDate = parseInt(val, "usageResetDate") +} + func (p configParser) ParseAllowedClient(val string) { config.AllowedClient = val } diff --git a/proxy.go b/proxy.go index f35e2a10..8cde694e 100644 --- a/proxy.go +++ b/proxy.go @@ -502,7 +502,7 @@ func (c *clientConn) serve() { } if config.UserCapacityFile != "" { - if checkUsage(&r) != true { + if checkUsage(c.RemoteAddr().String()) != true { sendErrorPage(c, statusForbidden, "Run out of capacity", genErrMsg(&r, nil, "Please contact proxy admin.")) return @@ -1294,7 +1294,7 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() // response received successfully - accumulateUsage(r, rp) + accumulateUsage(c.RemoteAddr().String(), rp) } return err } diff --git a/usage.go b/usage.go index e0227bd9..900acf9b 100644 --- a/usage.go +++ b/usage.go @@ -17,6 +17,7 @@ var recordPath string var userUsage struct { usage map[string]int capacity map[string]int + addrToUser map[string]string lastSavedts time.Time } @@ -94,6 +95,19 @@ func loadUsage() { } r := bufio.NewReader(f) s := bufio.NewScanner(r) + for s.Scan() { + ts := s.Text() + if ts == "" { + continue + } + if t, e := time.Parse(time.ANSIC, ts); e == nil { + userUsage.lastSavedts = t + break + } else { + Fatal("incomplete user record, please delete ", recordPath, " and restart: ", e) + return + } + } for s.Scan() { line := s.Text() if line == "" { @@ -110,12 +124,24 @@ func loadUsage() { } func flushLog() { + if time.Now().Day() == config.UsageResetDate && + userUsage.lastSavedts.Day() != config.UsageResetDate { + //it's time to clear the record of last month + for k, _ := range userUsage.usage { + userUsage.usage[k] = 0 + } + + } bakPath := recordPath + ".bak" f, err := os.OpenFile(bakPath, os.O_WRONLY | os.O_CREATE, 0600) if err != nil { Fatal("error opening/creating user record file:", err) } w := bufio.NewWriter(f) + t := time.Now() + w.WriteString(t.Format(time.ANSIC)) + w.WriteString("\n") + w.Flush() for k, v := range userUsage.usage { r := fmt.Sprintf("%s:%d\n", k, v) w.WriteString(r) @@ -125,6 +151,7 @@ func flushLog() { os.Remove(recordPath) os.Rename(bakPath, recordPath) + userUsage.lastSavedts = t } @@ -158,7 +185,7 @@ func startUsageRecorder(wg *sync.WaitGroup, quit <-chan struct{}) { func initUsage() bool{ if config.UserPasswdFile == "" || - config.UserCapacityFile == "" { + config.UserCapacityFile == ""{ return false } // get current running path @@ -172,7 +199,7 @@ func initUsage() bool{ userUsage.capacity = make(map[string]int) userUsage.usage = make(map[string]int) - userUsage.lastSavedts = time.Now() + userUsage.addrToUser = make(map[string]string) //load capacity at first loadCapcity(config.UserCapacityFile) @@ -181,46 +208,49 @@ func initUsage() bool{ return true } -func checkUsage(r *Request) bool { - arr := strings.SplitN(r.ProxyAuthorization, " ", 2) - if len(arr) != 2 { - err := errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization) - Fatal(err) - } - userPasswd := strings.Split(arr[1], ":") - if len(userPasswd) != 2 { - err := errors.New("auth: malformed basic auth user:passwd") - Fatal(err) - } - user := arr[0] +func checkUsage(addr string) bool { + var user string var capacity int var usage int + if val, ok := userUsage.addrToUser[addr]; ok { + user = val + } else { + debug.Println("unkonw address: ", addr) + return false + } if val, ok := userUsage.capacity[user]; ok { capacity = val + } else { + debug.Println("unkonw user: ", user) + return false } // don't have to check here usage = userUsage.usage[user] - return (usage > capacity) + usageInMB := usage / 1024 / 1024 + return (usageInMB < capacity) } -func accumulateUsage(r *Request, rp *Response) { - arr := strings.SplitN(r.ProxyAuthorization, " ", 2) - if len(arr) != 2 { - err := errors.New("auth: malformed ProxyAuthorization header: " + r.ProxyAuthorization) - Fatal(err) - } - userPasswd := strings.Split(arr[1], ":") - if len(userPasswd) != 2 { - err := errors.New("auth: malformed basic auth user:passwd") - Fatal(err) +func accumulateUsage(addr string, rp *Response) { + var user string + if val, ok := userUsage.addrToUser[addr]; ok { + user = val + } else { + debug.Println("un recorded addr: ", addr) + Fatal("un recorded addr: ", addr) } - user := arr[0] if _, ok := userUsage.usage[user]; ok { - userUsage.usage[user] += len(rp.rawByte) / 1024 / 1024 - debug.Printf("user: %s add %d MB, total %d", user, len(rp.rawByte) / 1024 / 1024, userUsage.usage[user]) + if rp.ContLen > 0 { + userUsage.usage[user] += int(rp.ContLen) + debug.Printf("user: %s add %d BYTE, total %d", user, int(rp.ContLen), userUsage.usage[user]) + } } } +func updateAddrToUser(addr string, user string) { + userUsage.addrToUser[addr] = user + // add record + debug.Println("add addr: ", addr, "to user: ", user) +} From cf7e4025898e1c9b5d64824af47c6c641f1e47c2 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Mon, 11 Jul 2016 15:15:42 +0800 Subject: [PATCH 08/17] capture the video usage --- auth.go | 6 +++--- doc/sample-config/rc | 10 ++++++++++ proxy.go | 4 +++- usage.go | 24 ++++++++++++++++-------- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/auth.go b/auth.go index 76df0339..b8023d75 100644 --- a/auth.go +++ b/auth.go @@ -184,7 +184,7 @@ func initAuth() { // authentication is needed, and should be passed back on subsequent call. func Authenticate(conn *clientConn, r *Request) (err error) { clientIP, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - if auth.authed.has(conn.RemoteAddr().String()) { + if auth.authed.has(clientIP) { debug.Printf("%s has already authed\n", conn.RemoteAddr().String()) return } @@ -193,9 +193,9 @@ func Authenticate(conn *clientConn, r *Request) (err error) { } err, user := authUserPasswd(conn, r) if err == nil && user != ""{ - auth.authed.add(conn.RemoteAddr().String()) + auth.authed.add(clientIP) // update the map of address to userid in usage - updateAddrToUser(conn.RemoteAddr().String(), user) + updateAddrToUser(clientIP, user) } return diff --git a/doc/sample-config/rc b/doc/sample-config/rc index 362f16fc..de3c3b91 100644 --- a/doc/sample-config/rc +++ b/doc/sample-config/rc @@ -115,6 +115,16 @@ listen = http://127.0.0.1:7777 # 注意:如有重复用户,COW 会报错退出 #userPasswdFile = /path/to/file +# 如需开启用户流量监控,必须使用文件来管理用户名密码, +# 同时在另一个文件中设定每个用户的流量,流量为整数,以MB为单位, 每行内容如下 +# username:capacity +# 这里的username必须与密码文件中的username对应 +# 同时设定该文件的路径 +# 以及自动重置用量的日期 +# 若不需要重置,请将日期设置为-1 +#userCapacityFile = /path/to/file +#usageResetDate = 12 + # 认证失效时间 # 语法:2h3m4s 表示 2 小时 3 分钟 4 秒 #authTimeout = 2h diff --git a/proxy.go b/proxy.go index 8cde694e..893fb0e2 100644 --- a/proxy.go +++ b/proxy.go @@ -1038,6 +1038,8 @@ func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { // set state to rsRecvBody to indicate the request has partial response sent to client r.state = rsRecvBody sv.state = svSendRecvResponse + // update usage for user + accumulateUsage(c.RemoteAddr().String(), n) if total > directThreshold { sv.updateVisit() } @@ -1294,7 +1296,7 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() // response received successfully - accumulateUsage(c.RemoteAddr().String(), rp) + accumulateUsage(c.RemoteAddr().String(), int(rp.ContLen)) } return err } diff --git a/usage.go b/usage.go index 900acf9b..e2033f44 100644 --- a/usage.go +++ b/usage.go @@ -10,6 +10,7 @@ import ( "bytes" "fmt" "sync" + "net" ) var recordPath string @@ -125,6 +126,7 @@ func loadUsage() { func flushLog() { if time.Now().Day() == config.UsageResetDate && + config.UsageResetDate != -1 && userUsage.lastSavedts.Day() != config.UsageResetDate { //it's time to clear the record of last month for k, _ := range userUsage.usage { @@ -188,6 +190,10 @@ func initUsage() bool{ config.UserCapacityFile == ""{ return false } + + if config.UsageResetDate == 0 || config.UsageResetDate > 30 { + Fatal("wrong UsageResetDate: ", config.UsageResetDate) + } // get current running path dir, err := os.Getwd() if err != nil { @@ -209,19 +215,20 @@ func initUsage() bool{ } func checkUsage(addr string) bool { + clientIP, _, _ := net.SplitHostPort(addr) var user string var capacity int var usage int - if val, ok := userUsage.addrToUser[addr]; ok { + if val, ok := userUsage.addrToUser[clientIP]; ok { user = val } else { - debug.Println("unkonw address: ", addr) + errl.Println("unkonw address: ", addr) return false } if val, ok := userUsage.capacity[user]; ok { capacity = val } else { - debug.Println("unkonw user: ", user) + errl.Println("unkonw user: ", user) return false } // don't have to check here @@ -230,18 +237,19 @@ func checkUsage(addr string) bool { return (usageInMB < capacity) } -func accumulateUsage(addr string, rp *Response) { +func accumulateUsage(addr string, size int) { + clientIP, _, _ := net.SplitHostPort(addr) var user string - if val, ok := userUsage.addrToUser[addr]; ok { + if val, ok := userUsage.addrToUser[clientIP]; ok { user = val } else { debug.Println("un recorded addr: ", addr) Fatal("un recorded addr: ", addr) } if _, ok := userUsage.usage[user]; ok { - if rp.ContLen > 0 { - userUsage.usage[user] += int(rp.ContLen) - debug.Printf("user: %s add %d BYTE, total %d", user, int(rp.ContLen), userUsage.usage[user]) + if size > 0 { + userUsage.usage[user] += size + debug.Printf("user: %s add %d BYTE, total %d", user, size, userUsage.usage[user]) } } From a53e794d5def11479d4264daf39ed4fd0912e03f Mon Sep 17 00:00:00 2001 From: Frank Fu Date: Mon, 11 Jul 2016 20:35:51 +0800 Subject: [PATCH 09/17] add allowed client in usage recording --- auth.go | 3 +++ usage.go | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/auth.go b/auth.go index b8023d75..def61460 100644 --- a/auth.go +++ b/auth.go @@ -117,6 +117,9 @@ func parseAllowedClient(val string) { mask = NewNbitIPv4Mask(32) } auth.allowedClient[i] = netAddr{ip.Mask(mask), mask} + + // TODO: add mask here, add record in usage + addAllowedClient(ipAndMask[0]) } } diff --git a/usage.go b/usage.go index e2033f44..7572598f 100644 --- a/usage.go +++ b/usage.go @@ -262,3 +262,11 @@ func updateAddrToUser(addr string, user string) { debug.Println("add addr: ", addr, "to user: ", user) } +func addAllowedClient(addr string) { + if _, ok := userUsage.addrToUser[addr]; ok { + debug.Println("duplicated allowed client ip: ", addr) + return + } + + userUsage[addr] = addr +} From 4599a533ee5aeb881b12fee93d623c1c0ff3d8b5 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Mon, 11 Jul 2016 20:46:55 +0800 Subject: [PATCH 10/17] fix compile bug --- main.go | 4 +++- usage.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index d651f3fa..9b17a5c6 100644 --- a/main.go +++ b/main.go @@ -41,13 +41,15 @@ func main() { initSelfListenAddr() initLog() + + usageFlag := initUsage() + initAuth() initSiteStat() initPAC() // initPAC uses siteStat, so must init after site stat initStat() - usageFlag := initUsage() initParentPool() diff --git a/usage.go b/usage.go index 7572598f..1e8c9c63 100644 --- a/usage.go +++ b/usage.go @@ -268,5 +268,5 @@ func addAllowedClient(addr string) { return } - userUsage[addr] = addr + userUsage.addrToUser[addr] = addr } From d046d89a2f645921d9612c3236a5588f6bcac8ed Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 13 Jul 2016 13:23:52 +0800 Subject: [PATCH 11/17] change flush interval to half hour --- usage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/usage.go b/usage.go index 1e8c9c63..e159ced6 100644 --- a/usage.go +++ b/usage.go @@ -178,7 +178,7 @@ func startUsageRecorder(wg *sync.WaitGroup, quit <-chan struct{}) { if exit { break } - if interval > 7200 { + if interval > 1800 { flushLog() interval = 0 } From 875df61db80007c753ef659127dadf3daa149e11 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 13 Jul 2016 13:24:23 +0800 Subject: [PATCH 12/17] add comment to support usage recording --- doc/sample-config/rc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/sample-config/rc b/doc/sample-config/rc index de3c3b91..1db15ac5 100644 --- a/doc/sample-config/rc +++ b/doc/sample-config/rc @@ -122,6 +122,8 @@ listen = http://127.0.0.1:7777 # 同时设定该文件的路径 # 以及自动重置用量的日期 # 若不需要重置,请将日期设置为-1 +# 如果添加了allowedClient,并且要对其限流,则需要将IP和对应的流量加入到userCapacityFile中 +# 暂时只支持单个IP的记录 #userCapacityFile = /path/to/file #usageResetDate = 12 From a1f9221fa1dcdaa6f862666b14a913cf9f79711a Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 13 Jul 2016 13:41:41 +0800 Subject: [PATCH 13/17] update sample rc doc --- doc/sample-config/rc | 2 +- doc/sample-config/rc-en | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/doc/sample-config/rc b/doc/sample-config/rc index 1db15ac5..47389206 100644 --- a/doc/sample-config/rc +++ b/doc/sample-config/rc @@ -122,7 +122,7 @@ listen = http://127.0.0.1:7777 # 同时设定该文件的路径 # 以及自动重置用量的日期 # 若不需要重置,请将日期设置为-1 -# 如果添加了allowedClient,并且要对其限流,则需要将IP和对应的流量加入到userCapacityFile中 +# 如果添加了allowedClient,并且要对其限流,则需要将IP作为username和对应的流量加入到userCapacityFile中 # 暂时只支持单个IP的记录 #userCapacityFile = /path/to/file #usageResetDate = 12 diff --git a/doc/sample-config/rc-en b/doc/sample-config/rc-en index 206bcf6c..1d1fb7ce 100644 --- a/doc/sample-config/rc-en +++ b/doc/sample-config/rc-en @@ -134,6 +134,18 @@ listen = http://127.0.0.1:7777 # COW will report error and exit if there's duplicated user. #userPasswdFile = /path/to/file +# To enable data transfer recording, the userPasswdFile must be enabled. +# List all those content in another file like this: +# username:capacity +# The username here should match those in userPasswd file. +# Set the path to the capacity file +# and usage reset date. +# Set the reset date to -1 if it is unnecessary. +# If the allowedClient is enable and their capcity is limited, the IP address(as username) and capacity must been added to userCapacityFile +# The sub-network is not supported yet. +#userCapacityFile = /path/to/file +#usageResetDate = 12 + # Time interval to keep authentication information. # Syntax: 2h3m4s means 2 hours 3 minutes 4 seconds #authTimeout = 2h From a66b6d4a0a459eb768683db60057e71a8d001689 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Wed, 13 Jul 2016 14:57:48 +0800 Subject: [PATCH 14/17] fix a bug of missing if judgment of usage recording --- auth.go | 4 +++- main.go | 5 ++++- proxy.go | 10 +++++++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/auth.go b/auth.go index def61460..0ed6c7d9 100644 --- a/auth.go +++ b/auth.go @@ -119,7 +119,9 @@ func parseAllowedClient(val string) { auth.allowedClient[i] = netAddr{ip.Mask(mask), mask} // TODO: add mask here, add record in usage - addAllowedClient(ipAndMask[0]) + if (usageFlag) { + addAllowedClient(ipAndMask[0]) + } } } diff --git a/main.go b/main.go index 9b17a5c6..6e10ff9d 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,8 @@ var ( relaunch bool ) +var usageFlag bool + // This code is from goagain func lookPath() (argv0 string, err error) { argv0, err = exec.LookPath(os.Args[0]) @@ -29,6 +31,7 @@ func lookPath() (argv0 string, err error) { } func main() { + usageFlag = false quit = make(chan struct{}) // Parse flags after load config to allow override options in config cmdLineConfig := parseCmdLineConfig() @@ -42,7 +45,7 @@ func main() { initSelfListenAddr() initLog() - usageFlag := initUsage() + usageFlag = initUsage() initAuth() initSiteStat() diff --git a/proxy.go b/proxy.go index 893fb0e2..4491d2c7 100644 --- a/proxy.go +++ b/proxy.go @@ -501,7 +501,7 @@ func (c *clientConn) serve() { return } - if config.UserCapacityFile != "" { + if usageFlag { if checkUsage(c.RemoteAddr().String()) != true { sendErrorPage(c, statusForbidden, "Run out of capacity", genErrMsg(&r, nil, "Please contact proxy admin.")) @@ -1039,7 +1039,9 @@ func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { r.state = rsRecvBody sv.state = svSendRecvResponse // update usage for user - accumulateUsage(c.RemoteAddr().String(), n) + if (usageFlag) { + accumulateUsage(c.RemoteAddr().String(), n) + } if total > directThreshold { sv.updateVisit() } @@ -1296,7 +1298,9 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() // response received successfully - accumulateUsage(c.RemoteAddr().String(), int(rp.ContLen)) + if (usageFlag) { + accumulateUsage(c.RemoteAddr().String(), int(rp.ContLen)) + } } return err } From 42cdf2175475819407310ce9c5175f5d01a95a26 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Tue, 19 Jul 2016 10:42:56 +0800 Subject: [PATCH 15/17] fix multi-thread data racing bug on map --- auth.go | 4 +++- proxy.go | 10 ++++----- usage.go | 66 +++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 59 insertions(+), 21 deletions(-) diff --git a/auth.go b/auth.go index 0ed6c7d9..16bc9c0d 100644 --- a/auth.go +++ b/auth.go @@ -200,7 +200,9 @@ func Authenticate(conn *clientConn, r *Request) (err error) { if err == nil && user != ""{ auth.authed.add(clientIP) // update the map of address to userid in usage - updateAddrToUser(clientIP, user) + if usageFlag { + updateAddrToUser(clientIP, user) + } } return diff --git a/proxy.go b/proxy.go index 4491d2c7..5f33f193 100644 --- a/proxy.go +++ b/proxy.go @@ -993,8 +993,13 @@ var connectBuf = leakybuf.NewLeakyBuf(512, connectBufSize) func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { buf := connectBuf.Get() + total := 0 defer func() { connectBuf.Put(buf) + // update usage for user + if (usageFlag) { + accumulateUsage(c.RemoteAddr().String(), total) + } }() /* @@ -1005,7 +1010,6 @@ func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { } */ - total := 0 const directThreshold = 8192 readTimeoutSet := false for { @@ -1038,10 +1042,6 @@ func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { // set state to rsRecvBody to indicate the request has partial response sent to client r.state = rsRecvBody sv.state = svSendRecvResponse - // update usage for user - if (usageFlag) { - accumulateUsage(c.RemoteAddr().String(), n) - } if total > directThreshold { sv.updateVisit() } diff --git a/usage.go b/usage.go index e159ced6..45255190 100644 --- a/usage.go +++ b/usage.go @@ -20,8 +20,14 @@ var userUsage struct { capacity map[string]int addrToUser map[string]string lastSavedts time.Time + + // channel for update + updateMsg chan string + updateSig chan bool } +// var tempUsage map[string]int + func parseCapacity(line string) (user string, capacity int, err error) { arr := strings.Split(line, ":") n := len(arr) @@ -167,9 +173,13 @@ func startUsageRecorder(wg *sync.WaitGroup, quit <-chan struct{}) { var exit bool go func() { <-quit + userUsage.updateSig <- true exit=true }() + go updateUsage(userUsage.updateMsg, userUsage.updateSig) + + debug.Println("start usage recording!") interval := 0 for { @@ -206,6 +216,10 @@ func initUsage() bool{ userUsage.capacity = make(map[string]int) userUsage.usage = make(map[string]int) userUsage.addrToUser = make(map[string]string) + // tempUsage = make(map[string]int) + + userUsage.updateMsg = make(chan string, 1000) + userUsage.updateSig = make(chan bool) //load capacity at first loadCapcity(config.UserCapacityFile) @@ -238,27 +252,28 @@ func checkUsage(addr string) bool { } func accumulateUsage(addr string, size int) { - clientIP, _, _ := net.SplitHostPort(addr) - var user string - if val, ok := userUsage.addrToUser[clientIP]; ok { - user = val - } else { - debug.Println("un recorded addr: ", addr) - Fatal("un recorded addr: ", addr) - } - if _, ok := userUsage.usage[user]; ok { - if size > 0 { - userUsage.usage[user] += size - debug.Printf("user: %s add %d BYTE, total %d", user, size, userUsage.usage[user]) - } - } - + msg := addr + "-" + strconv.Itoa(size) + userUsage.updateMsg <- msg + return + // clientIP, _, _ := net.SplitHostPort(addr) + // if _, ok := userUsage.addrToUser[clientIP]; !ok { + // errl.Println("un recorded addr: ", addr) + // return + // } + // if _, ok := tempUsage[addr]; ok { + // tempUsage[addr] += size + // } else { + // tempUsage[addr] = size + // } } func updateAddrToUser(addr string, user string) { userUsage.addrToUser[addr] = user // add record + if _, ok := userUsage.capacity[user]; !ok { + errl.Println("un restricted user: ", user, " check user password file and user capcity file") + } debug.Println("add addr: ", addr, "to user: ", user) } @@ -270,3 +285,24 @@ func addAllowedClient(addr string) { userUsage.addrToUser[addr] = addr } + +func updateUsage(msgChan chan string, sigChan chan bool) { + for { + select { + case msg := <- msgChan: + arr := strings.Split(msg, "-") + addr := arr[0] + size, _ := strconv.Atoi(arr[1]) + clientIP, _, _ := net.SplitHostPort(addr) + if user, ok := userUsage.addrToUser[clientIP]; ok { + userUsage.usage[user] += size + } else { + errl.Println("un recorded addr: ", addr) + } + + case <- sigChan: + return + } + } + +} From 7673dffb80a2f0537e4b094363df60ba42867ecf Mon Sep 17 00:00:00 2001 From: frankfzw Date: Sat, 23 Jul 2016 10:12:11 +0800 Subject: [PATCH 16/17] add auto restart feature --- config.go | 6 ++++++ doc/sample-config/rc | 4 ++++ doc/sample-config/rc-en | 4 ++++ main.go | 8 ++++++++ main_unix.go | 30 ++++++++++++++++++++++++++++++ usage.go | 2 +- 6 files changed, 53 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index 694febd2..2749f573 100644 --- a/config.go +++ b/config.go @@ -80,6 +80,8 @@ type Config struct { // capacity limitation file UserCapacityFile string UsageResetDate int + + RestartInterval time.Duration } var config Config @@ -583,6 +585,10 @@ func (p configParser) ParseAuthTimeout(val string) { config.AuthTimeout = parseDuration(val, "authTimeout") } +func (p configParser) ParseRestartInterval(val string) { + config.RestartInterval = parseDuration(val, "restartInterval") +} + func (p configParser) ParseCore(val string) { config.Core = parseInt(val, "core") } diff --git a/doc/sample-config/rc b/doc/sample-config/rc index 47389206..b10071ca 100644 --- a/doc/sample-config/rc +++ b/doc/sample-config/rc @@ -131,6 +131,10 @@ listen = http://127.0.0.1:7777 # 语法:2h3m4s 表示 2 小时 3 分钟 4 秒 #authTimeout = 2h +# 代理自动重启时间 +# 语法:2h3m4s 表示 2 小时 3 分钟 4 秒 +#restartInterval = 2h + ############################# # 高级选项 ############################# diff --git a/doc/sample-config/rc-en b/doc/sample-config/rc-en index 1d1fb7ce..0b3b2a60 100644 --- a/doc/sample-config/rc-en +++ b/doc/sample-config/rc-en @@ -150,6 +150,10 @@ listen = http://127.0.0.1:7777 # Syntax: 2h3m4s means 2 hours 3 minutes 4 seconds #authTimeout = 2h +# Time interval to restart proxy. +# Syntax: 2h3m4s means 2 hours 3 minutes 4 seconds +#restartInterval = 2h + ############################# # Advanced options ############################# diff --git a/main.go b/main.go index 6e10ff9d..da68ba60 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ func lookPath() (argv0 string, err error) { return } + func main() { usageFlag = false quit = make(chan struct{}) @@ -89,6 +90,13 @@ func main() { wg.Add(1) go startUsageRecorder(&wg, quit) } + + // start restart deamon + if config.RestartInterval != 0 { + wg.Add(1) + pid := os.Getpid() + go restartDeamon(pid, &wg, quit) + } wg.Wait() if relaunch { diff --git a/main_unix.go b/main_unix.go index bd780bd9..0665a6ea 100644 --- a/main_unix.go +++ b/main_unix.go @@ -6,6 +6,8 @@ import ( "os" "os/signal" "syscall" + "time" + "sync" ) func sigHandler() { @@ -28,3 +30,31 @@ func sigHandler() { } */ } + +func restartDeamon(pid int, wg *sync.WaitGroup, quit <-chan struct{}) { + defer func() { + wg.Done() + }() + + duration := int(config.RestartInterval.Seconds()) + interval := 0 + debug.Println("Pid: ", pid, "restart interval: ", duration) + for { + select { + case <- quit: + debug.Println("exit the restart deamon") + return + default: + time.Sleep(time.Second) + interval += 1 + if (interval > duration) { + info.Println("Restart proxy now!") + // connPool.CloseAll() + syscall.Kill(pid, syscall.SIGUSR1) + return + } + } + } + + +} diff --git a/usage.go b/usage.go index 45255190..2a9d6005 100644 --- a/usage.go +++ b/usage.go @@ -218,7 +218,7 @@ func initUsage() bool{ userUsage.addrToUser = make(map[string]string) // tempUsage = make(map[string]int) - userUsage.updateMsg = make(chan string, 1000) + userUsage.updateMsg = make(chan string, 5000) userUsage.updateSig = make(chan bool) //load capacity at first loadCapcity(config.UserCapacityFile) From d9e2bd5ce034bd08a7cbba5b06df0b0a4bf234d5 Mon Sep 17 00:00:00 2001 From: frankfzw Date: Mon, 25 Jul 2016 15:07:45 +0800 Subject: [PATCH 17/17] start a new thread to accumulateUsage in proxy --- proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy.go b/proxy.go index 5f33f193..083eeaaf 100644 --- a/proxy.go +++ b/proxy.go @@ -998,7 +998,7 @@ func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { connectBuf.Put(buf) // update usage for user if (usageFlag) { - accumulateUsage(c.RemoteAddr().String(), total) + go accumulateUsage(c.RemoteAddr().String(), total) } }()