diff --git a/docs/basics.md b/docs/basics.md index 67da3acbf..0f35b0635 100644 --- a/docs/basics.md +++ b/docs/basics.md @@ -292,6 +292,35 @@ In this example, traffic routed through the first frontend will have the `X-Fram The detailed documentation for those security headers can be found in [unrolled/secure](https://github.com/unrolled/secure#available-options). +#### Rate limiting + +Rate limiting can be configured per frontend. +Multiple sets of rates can be added to each frontend, but the time periods must be unique. + +```toml +[frontends] + [frontends.frontend1] + passHostHeader = true + entrypoints = ["http"] + backend = "backend1" + [frontends.frontend1.routes.test_1] + rule = "Path:/" + [frontends.frontend1.ratelimit] + extractorfunc = "client.ip" + [frontends.frontend1.ratelimit.rateset.rateset1] + period = "10s" + average = 100 + burst = 200 + [frontends.frontend1.ratelimit.rateset.rateset2] + period = "3s" + average = 5 + burst = 10 +``` + +In the above example, frontend1 is configured to limit requests by the client's ip address. +An average of 5 requests every 3 seconds is allowed and an average of 100 requests every 10 seconds. +These can "burst" up to 10 and 200 in each period respectively. + ### Backends A backend is responsible to load-balance the traffic coming from one or more frontends to a set of http servers. diff --git a/glide.lock b/glide.lock index 565142835..db552fdae 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: c5a62552734b2e283ebedf94e6b711e7efa47d2b1b80f3dda6ca923317fef3c2 -updated: 2017-08-25T11:52:16.848940186+02:00 +hash: 857714f15cc10657ae2f15f3ab6592734a1ebc524d32b5353948258ce3043b6a +updated: 2017-09-09T11:52:16.848940186+02:00 imports: - name: cloud.google.com/go version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c @@ -320,8 +320,12 @@ imports: version: 77ed1c8a01217656d2080ad51981f6e99adaa177 - name: github.com/kr/logfmt version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 +- name: github.com/mailgun/minheap + version: 7c28d80e2ada649fc8ab1a37b86d30a2633bd47c - name: github.com/mailgun/timetools version: 7e6055773c5137efbeb3bd2410d705fe10ab6bfd +- name: github.com/mailgun/ttlmap + version: c1c17f74874f2a5ea48bfb06b5459d4ef2689749 - name: github.com/mailru/easyjson version: d5b7844b561a7bc640052f1b935f7b800330d7e0 subpackages: @@ -487,6 +491,7 @@ imports: - connlimit - forward - memmetrics + - ratelimit - roundrobin - stream - utils diff --git a/glide.yaml b/glide.yaml index cdfa3c63f..09275755a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -22,6 +22,7 @@ import: - roundrobin - stream - utils + - ratelimit - package: github.com/urfave/negroni version: 490e6a555d47ca891a89a150d0c1ef3922dfffe9 - package: github.com/containous/staert @@ -79,9 +80,9 @@ import: vcs: git - package: github.com/abbot/go-http-auth - package: github.com/NYTimes/gziphandler + version: ^v1002.0.0 repo: https://github.com/containous/gziphandler.git vcs: git - version: ^v1002.0.0 - package: github.com/docker/leadership - package: github.com/satori/go.uuid version: ^1.1.0 diff --git a/integration/fixtures/ratelimit/simple.toml b/integration/fixtures/ratelimit/simple.toml new file mode 100644 index 000000000..0a87d1f38 --- /dev/null +++ b/integration/fixtures/ratelimit/simple.toml @@ -0,0 +1,30 @@ +defaultEntryPoints = ["http"] + +logLevel = "DEBUG" + +[entryPoints] + [entryPoints.http] + address = ":80" + +[file] + +[backends] + [backends.backend1] + [backends.backend1.servers.server1] + url = "http://{{.Server1}}:80" +[frontends] + [frontends.frontend1] + passHostHeader = true + backend = "backend1" + [frontends.frontend1.routes.test_1] + rule = "Path:/" + [frontends.frontend1.ratelimit] + extractorfunc = "client.ip" + [frontends.frontend1.ratelimit.rateset.rateset1] + period = "60s" + average = 4 + burst = 5 + [frontends.frontend1.ratelimit.rateset.rateset2] + period = "3s" + average = 1 + burst = 2 diff --git a/integration/integration_test.go b/integration/integration_test.go index 618b76df3..1c0850ff8 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -39,6 +39,7 @@ func init() { check.Suite(&LogRotationSuite{}) check.Suite(&MarathonSuite{}) check.Suite(&MesosSuite{}) + check.Suite(&RateLimitSuite{}) check.Suite(&SimpleSuite{}) check.Suite(&TimeoutSuite{}) check.Suite(&WebsocketSuite{}) diff --git a/integration/ratelimit_test.go b/integration/ratelimit_test.go new file mode 100644 index 000000000..c307d6748 --- /dev/null +++ b/integration/ratelimit_test.go @@ -0,0 +1,61 @@ +package integration + +import ( + "net/http" + "os" + "time" + + "github.com/containous/traefik/integration/try" + "github.com/go-check/check" + checker "github.com/vdemeester/shakers" +) + +type RateLimitSuite struct { + BaseSuite + ServerIP string +} + +func (s *RateLimitSuite) SetUpSuite(c *check.C) { + s.createComposeProject(c, "ratelimit") + s.composeProject.Start(c) + + s.ServerIP = s.composeProject.Container(c, "nginx1").NetworkSettings.IPAddress +} + +func (s *RateLimitSuite) TestSimpleConfiguration(c *check.C) { + file := s.adaptFile(c, "fixtures/ratelimit/simple.toml", struct { + Server1 string + }{s.ServerIP}) + defer os.Remove(file) + + cmd, _ := s.cmdTraefik(withConfigFile(file)) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusTooManyRequests)) + c.Assert(err, checker.IsNil) + + // sleep for 4 seconds to be certain the configured time period has elapsed + // then test another request and verify a 200 status code + time.Sleep(4 * time.Second) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + // continue requests at 3 second intervals to test the other rate limit time period + time.Sleep(3 * time.Second) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + time.Sleep(3 * time.Second) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + time.Sleep(3 * time.Second) + err = try.GetRequest("http://127.0.0.1:80/", 500*time.Millisecond, try.StatusCodeIs(http.StatusTooManyRequests)) + c.Assert(err, checker.IsNil) +} diff --git a/integration/resources/compose/ratelimit.yml b/integration/resources/compose/ratelimit.yml new file mode 100644 index 000000000..d4699ed71 --- /dev/null +++ b/integration/resources/compose/ratelimit.yml @@ -0,0 +1,2 @@ +nginx1: + image: nginx:alpine diff --git a/server/server.go b/server/server.go index 0f5379d34..e573de671 100644 --- a/server/server.go +++ b/server/server.go @@ -36,6 +36,7 @@ import ( "github.com/vulcand/oxy/cbreaker" "github.com/vulcand/oxy/connlimit" "github.com/vulcand/oxy/forward" + "github.com/vulcand/oxy/ratelimit" "github.com/vulcand/oxy/roundrobin" "github.com/vulcand/oxy/utils" "golang.org/x/net/http2" @@ -891,6 +892,15 @@ func (server *Server) loadConfig(configurations types.Configurations, globalConf } } + if frontend.RateLimit != nil && len(frontend.RateLimit.RateSet) > 0 { + lb, err = server.buildRateLimiter(lb, frontend.RateLimit) + if err != nil { + log.Errorf("Error creating rate limiter: %v", err) + log.Errorf("Skipping frontend %s...", frontendName) + continue frontend + } + } + maxConns := config.Backends[frontend.Backend].MaxConn if maxConns != nil && maxConns.Amount != 0 { extractFunc, err := utils.NewExtractor(maxConns.ExtractorFunc) @@ -1189,6 +1199,21 @@ func stopMetricsClients() { metrics.StopStatsd() } +func (server *Server) buildRateLimiter(handler http.Handler, rlConfig *types.RateLimit) (http.Handler, error) { + extractFunc, err := utils.NewExtractor(rlConfig.ExtractorFunc) + if err != nil { + return nil, err + } + log.Debugf("Creating load-balancer rate limiter") + rateSet := ratelimit.NewRateSet() + for _, rate := range rlConfig.RateSet { + if err := rateSet.Add(time.Duration(rate.Period), rate.Average, rate.Burst); err != nil { + return nil, err + } + } + return ratelimit.New(handler, extractFunc, rateSet, ratelimit.Logger(oxyLogger)) +} + func (server *Server) buildRetryMiddleware(handler http.Handler, globalConfig configuration.GlobalConfiguration, countServers int, backendName string) http.Handler { retryListeners := middlewares.RetryListeners{} if server.metricsRegistry.IsEnabled() { diff --git a/types/types.go b/types/types.go index c22b8044b..b9f0cfd1c 100644 --- a/types/types.go +++ b/types/types.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "os" + "github.com/containous/flaeg" "github.com/containous/traefik/log" "github.com/docker/libkv/store" "github.com/ryanuber/go-glob" @@ -67,6 +68,19 @@ type ErrorPage struct { Query string `json:"query,omitempty"` } +// Rate holds a rate limiting configuration for a specific time period +type Rate struct { + Period flaeg.Duration `json:"period,omitempty"` + Average int64 `json:"average,omitempty"` + Burst int64 `json:"burst,omitempty"` +} + +// RateLimit holds a rate limiting configuration for a given frontend +type RateLimit struct { + RateSet map[string]*Rate `json:"rateset,omitempty"` + ExtractorFunc string `json:"extractorFunc,omitempty"` +} + // Headers holds the custom header configuration type Headers struct { CustomRequestHeaders map[string]string `json:"customRequestHeaders,omitempty"` @@ -131,6 +145,7 @@ type Frontend struct { WhitelistSourceRange []string `json:"whitelistSourceRange,omitempty"` Headers Headers `json:"headers,omitempty"` Errors map[string]ErrorPage `json:"errors,omitempty"` + RateLimit *RateLimit `json:"ratelimit,omitempty"` } // LoadBalancerMethod holds the method of load balancing to use. diff --git a/vendor/github.com/mailgun/minheap/LICENSE b/vendor/github.com/mailgun/minheap/LICENSE new file mode 100644 index 000000000..e06d20818 --- /dev/null +++ b/vendor/github.com/mailgun/minheap/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/mailgun/minheap/minheap.go b/vendor/github.com/mailgun/minheap/minheap.go new file mode 100644 index 000000000..ea906d669 --- /dev/null +++ b/vendor/github.com/mailgun/minheap/minheap.go @@ -0,0 +1,75 @@ +package minheap + +import ( + "container/heap" +) + +// An Element is something we manage in a priority queue. +type Element struct { + Value interface{} + Priority int // The priority of the item in the queue. + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type MinHeap []*Element + +func NewMinHeap() *MinHeap { + mh := &MinHeap{} + heap.Init(mh) + return mh +} + +func (mh MinHeap) Len() int { return len(mh) } + +func (mh MinHeap) Less(i, j int) bool { + return mh[i].Priority < mh[j].Priority +} + +func (mh MinHeap) Swap(i, j int) { + mh[i], mh[j] = mh[j], mh[i] + mh[i].index = i + mh[j].index = j +} + +func (mh *MinHeap) Push(x interface{}) { + n := len(*mh) + item := x.(*Element) + item.index = n + *mh = append(*mh, item) +} + +func (mh *MinHeap) Pop() interface{} { + old := *mh + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *mh = old[0 : n-1] + return item +} + +func (mh *MinHeap) PushEl(el *Element) { + heap.Push(mh, el) +} + +func (mh *MinHeap) PopEl() *Element { + el := heap.Pop(mh) + return el.(*Element) +} + +func (mh *MinHeap) PeekEl() *Element { + items := *mh + return items[0] +} + +// update modifies the priority and value of an Item in the queue. +func (mh *MinHeap) UpdateEl(el *Element, priority int) { + heap.Remove(mh, el.index) + el.Priority = priority + heap.Push(mh, el) +} + +func (mh *MinHeap) RemoveEl(el *Element) { + heap.Remove(mh, el.index) +} diff --git a/vendor/github.com/mailgun/ttlmap/LICENSE b/vendor/github.com/mailgun/ttlmap/LICENSE new file mode 100644 index 000000000..e06d20818 --- /dev/null +++ b/vendor/github.com/mailgun/ttlmap/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/mailgun/ttlmap/ttlmap.go b/vendor/github.com/mailgun/ttlmap/ttlmap.go new file mode 100644 index 000000000..3f5313bbf --- /dev/null +++ b/vendor/github.com/mailgun/ttlmap/ttlmap.go @@ -0,0 +1,278 @@ +package ttlmap + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/mailgun/minheap" + "github.com/mailgun/timetools" +) + +type TtlMapOption func(m *TtlMap) error + +// Clock sets the time provider clock, handy for testing +func Clock(c timetools.TimeProvider) TtlMapOption { + return func(m *TtlMap) error { + m.clock = c + return nil + } +} + +type Callback func(key string, el interface{}) + +// CallOnExpire will call this callback on expiration of elements +func CallOnExpire(cb Callback) TtlMapOption { + return func(m *TtlMap) error { + m.onExpire = cb + return nil + } +} + +type TtlMap struct { + capacity int + elements map[string]*mapElement + expiryTimes *minheap.MinHeap + clock timetools.TimeProvider + mutex *sync.RWMutex + // onExpire callback will be called when element is expired + onExpire Callback +} + +type mapElement struct { + key string + value interface{} + heapEl *minheap.Element +} + +func NewMap(capacity int, opts ...TtlMapOption) (*TtlMap, error) { + if capacity <= 0 { + return nil, errors.New("Capacity should be > 0") + } + + m := &TtlMap{ + capacity: capacity, + elements: make(map[string]*mapElement), + expiryTimes: minheap.NewMinHeap(), + } + + for _, o := range opts { + if err := o(m); err != nil { + return nil, err + } + } + + if m.clock == nil { + m.clock = &timetools.RealTime{} + } + + return m, nil +} + +func NewMapWithProvider(capacity int, timeProvider timetools.TimeProvider) (*TtlMap, error) { + if timeProvider == nil { + return nil, errors.New("Please pass timeProvider") + } + return NewMap(capacity, Clock(timeProvider)) +} + +func NewConcurrent(capacity int, opts ...TtlMapOption) (*TtlMap, error) { + m, err := NewMap(capacity, opts...) + if err == nil { + m.mutex = new(sync.RWMutex) + } + return m, err +} + +func (m *TtlMap) Set(key string, value interface{}, ttlSeconds int) error { + expiryTime, err := m.toEpochSeconds(ttlSeconds) + if err != nil { + return err + } + if m.mutex != nil { + m.mutex.Lock() + defer m.mutex.Unlock() + } + return m.set(key, value, expiryTime) +} + +func (m *TtlMap) Len() int { + if m.mutex != nil { + m.mutex.RLock() + defer m.mutex.RUnlock() + } + return len(m.elements) +} + +func (m *TtlMap) Get(key string) (interface{}, bool) { + value, mapEl, expired := m.lockNGet(key) + if mapEl == nil { + return nil, false + } + if expired { + m.lockNDel(mapEl) + return nil, false + } + return value, true +} + +func (m *TtlMap) Increment(key string, value int, ttlSeconds int) (int, error) { + expiryTime, err := m.toEpochSeconds(ttlSeconds) + if err != nil { + return 0, err + } + + if m.mutex != nil { + m.mutex.Lock() + defer m.mutex.Unlock() + } + + mapEl, expired := m.get(key) + if mapEl == nil || expired { + m.set(key, value, expiryTime) + return value, nil + } + + currentValue, ok := mapEl.value.(int) + if !ok { + return 0, fmt.Errorf("Expected existing value to be integer, got %T", mapEl.value) + } + + currentValue += value + m.set(key, currentValue, expiryTime) + return currentValue, nil +} + +func (m *TtlMap) GetInt(key string) (int, bool, error) { + valueI, exists := m.Get(key) + if !exists { + return 0, false, nil + } + value, ok := valueI.(int) + if !ok { + return 0, false, fmt.Errorf("Expected existing value to be integer, got %T", valueI) + } + return value, true, nil +} + +func (m *TtlMap) set(key string, value interface{}, expiryTime int) error { + if mapEl, ok := m.elements[key]; ok { + mapEl.value = value + m.expiryTimes.UpdateEl(mapEl.heapEl, expiryTime) + return nil + } + + if len(m.elements) >= m.capacity { + m.freeSpace(1) + } + heapEl := &minheap.Element{ + Priority: expiryTime, + } + mapEl := &mapElement{ + key: key, + value: value, + heapEl: heapEl, + } + heapEl.Value = mapEl + m.elements[key] = mapEl + m.expiryTimes.PushEl(heapEl) + return nil +} + +func (m *TtlMap) lockNGet(key string) (value interface{}, mapEl *mapElement, expired bool) { + if m.mutex != nil { + m.mutex.RLock() + defer m.mutex.RUnlock() + } + + mapEl, expired = m.get(key) + value = nil + if mapEl != nil { + value = mapEl.value + } + return value, mapEl, expired +} + +func (m *TtlMap) get(key string) (*mapElement, bool) { + mapEl, ok := m.elements[key] + if !ok { + return nil, false + } + now := int(m.clock.UtcNow().Unix()) + expired := mapEl.heapEl.Priority <= now + return mapEl, expired +} + +func (m *TtlMap) lockNDel(mapEl *mapElement) { + if m.mutex != nil { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Map element could have been updated. Now that we have a lock + // retrieve it again and check if it is still expired. + var ok bool + if mapEl, ok = m.elements[mapEl.key]; !ok { + return + } + now := int(m.clock.UtcNow().Unix()) + if mapEl.heapEl.Priority > now { + return + } + } + m.del(mapEl) +} + +func (m *TtlMap) del(mapEl *mapElement) { + if m.onExpire != nil { + m.onExpire(mapEl.key, mapEl.value) + } + + delete(m.elements, mapEl.key) + m.expiryTimes.RemoveEl(mapEl.heapEl) +} + +func (m *TtlMap) freeSpace(count int) { + removed := m.removeExpired(count) + if removed >= count { + return + } + m.removeLastUsed(count - removed) +} + +func (m *TtlMap) removeExpired(iterations int) int { + removed := 0 + now := int(m.clock.UtcNow().Unix()) + for i := 0; i < iterations; i += 1 { + if len(m.elements) == 0 { + break + } + heapEl := m.expiryTimes.PeekEl() + if heapEl.Priority > now { + break + } + m.expiryTimes.PopEl() + mapEl := heapEl.Value.(*mapElement) + delete(m.elements, mapEl.key) + removed += 1 + } + return removed +} + +func (m *TtlMap) removeLastUsed(iterations int) { + for i := 0; i < iterations; i += 1 { + if len(m.elements) == 0 { + return + } + heapEl := m.expiryTimes.PopEl() + mapEl := heapEl.Value.(*mapElement) + delete(m.elements, mapEl.key) + } +} + +func (m *TtlMap) toEpochSeconds(ttlSeconds int) (int, error) { + if ttlSeconds <= 0 { + return 0, fmt.Errorf("ttlSeconds should be >= 0, got %d", ttlSeconds) + } + return int(m.clock.UtcNow().Add(time.Second * time.Duration(ttlSeconds)).Unix()), nil +} diff --git a/vendor/github.com/vulcand/oxy/ratelimit/bucket.go b/vendor/github.com/vulcand/oxy/ratelimit/bucket.go new file mode 100644 index 000000000..78507faf9 --- /dev/null +++ b/vendor/github.com/vulcand/oxy/ratelimit/bucket.go @@ -0,0 +1,125 @@ +package ratelimit + +import ( + "fmt" + "time" + + "github.com/mailgun/timetools" +) + +const UndefinedDelay = -1 + +// rate defines token bucket parameters. +type rate struct { + period time.Duration + average int64 + burst int64 +} + +func (r *rate) String() string { + return fmt.Sprintf("rate(%v/%v, burst=%v)", r.average, r.period, r.burst) +} + +// Implements token bucket algorithm (http://en.wikipedia.org/wiki/Token_bucket) +type tokenBucket struct { + // The time period controlled by the bucket in nanoseconds. + period time.Duration + // The number of nanoseconds that takes to add one more token to the total + // number of available tokens. It effectively caches the value that could + // have been otherwise deduced from refillRate. + timePerToken time.Duration + // The maximum number of tokens that can be accumulate in the bucket. + burst int64 + // The number of tokens available for consumption at the moment. It can + // nether be larger then capacity. + availableTokens int64 + // Interface that gives current time (so tests can override) + clock timetools.TimeProvider + // Tells when tokensAvailable was updated the last time. + lastRefresh time.Time + // The number of tokens consumed the last time. + lastConsumed int64 +} + +// newTokenBucket crates a `tokenBucket` instance for the specified `Rate`. +func newTokenBucket(rate *rate, clock timetools.TimeProvider) *tokenBucket { + return &tokenBucket{ + period: rate.period, + timePerToken: time.Duration(int64(rate.period) / rate.average), + burst: rate.burst, + clock: clock, + lastRefresh: clock.UtcNow(), + availableTokens: rate.burst, + } +} + +// consume makes an attempt to consume the specified number of tokens from the +// bucket. If there are enough tokens available then `0, nil` is returned; if +// tokens to consume is larger than the burst size, then an error is returned +// and the delay is not defined; otherwise returned a none zero delay that tells +// how much time the caller needs to wait until the desired number of tokens +// will become available for consumption. +func (tb *tokenBucket) consume(tokens int64) (time.Duration, error) { + tb.updateAvailableTokens() + tb.lastConsumed = 0 + if tokens > tb.burst { + return UndefinedDelay, fmt.Errorf("Requested tokens larger than max tokens") + } + if tb.availableTokens < tokens { + return tb.timeTillAvailable(tokens), nil + } + tb.availableTokens -= tokens + tb.lastConsumed = tokens + return 0, nil +} + +// rollback reverts effect of the most recent consumption. If the most recent +// `consume` resulted in an error or a burst overflow, and therefore did not +// modify the number of available tokens, then `rollback` won't do that either. +// It is safe to call this method multiple times, for the second and all +// following calls have no effect. +func (tb *tokenBucket) rollback() { + tb.availableTokens += tb.lastConsumed + tb.lastConsumed = 0 +} + +// Update modifies `average` and `burst` fields of the token bucket according +// to the provided `Rate` +func (tb *tokenBucket) update(rate *rate) error { + if rate.period != tb.period { + return fmt.Errorf("Period mismatch: %v != %v", tb.period, rate.period) + } + tb.timePerToken = time.Duration(int64(tb.period) / rate.average) + tb.burst = rate.burst + if tb.availableTokens > rate.burst { + tb.availableTokens = rate.burst + } + return nil +} + +// timeTillAvailable returns the number of nanoseconds that we need to +// wait until the specified number of tokens becomes available for consumption. +func (tb *tokenBucket) timeTillAvailable(tokens int64) time.Duration { + missingTokens := tokens - tb.availableTokens + return time.Duration(missingTokens) * tb.timePerToken +} + +// updateAvailableTokens updates the number of tokens available for consumption. +// It is calculated based on the refill rate, the time passed since last refresh, +// and is limited by the bucket capacity. +func (tb *tokenBucket) updateAvailableTokens() { + now := tb.clock.UtcNow() + timePassed := now.Sub(tb.lastRefresh) + + tokens := tb.availableTokens + int64(timePassed/tb.timePerToken) + // If we haven't added any tokens that means that not enough time has passed, + // in this case do not adjust last refill checkpoint, otherwise it will be + // always moving in time in case of frequent requests that exceed the rate + if tokens != tb.availableTokens { + tb.lastRefresh = now + tb.availableTokens = tokens + } + if tb.availableTokens > tb.burst { + tb.availableTokens = tb.burst + } +} diff --git a/vendor/github.com/vulcand/oxy/ratelimit/bucketset.go b/vendor/github.com/vulcand/oxy/ratelimit/bucketset.go new file mode 100644 index 000000000..f4a246568 --- /dev/null +++ b/vendor/github.com/vulcand/oxy/ratelimit/bucketset.go @@ -0,0 +1,108 @@ +package ratelimit + +import ( + "fmt" + "strings" + "time" + + "github.com/mailgun/timetools" + "sort" +) + +// TokenBucketSet represents a set of TokenBucket covering different time periods. +type TokenBucketSet struct { + buckets map[time.Duration]*tokenBucket + maxPeriod time.Duration + clock timetools.TimeProvider +} + +// newTokenBucketSet creates a `TokenBucketSet` from the specified `rates`. +func NewTokenBucketSet(rates *RateSet, clock timetools.TimeProvider) *TokenBucketSet { + tbs := new(TokenBucketSet) + tbs.clock = clock + // In the majority of cases we will have only one bucket. + tbs.buckets = make(map[time.Duration]*tokenBucket, len(rates.m)) + for _, rate := range rates.m { + newBucket := newTokenBucket(rate, clock) + tbs.buckets[rate.period] = newBucket + tbs.maxPeriod = maxDuration(tbs.maxPeriod, rate.period) + } + return tbs +} + +// Update brings the buckets in the set in accordance with the provided `rates`. +func (tbs *TokenBucketSet) Update(rates *RateSet) { + // Update existing buckets and delete those that have no corresponding spec. + for _, bucket := range tbs.buckets { + if rate, ok := rates.m[bucket.period]; ok { + bucket.update(rate) + } else { + delete(tbs.buckets, bucket.period) + } + } + // Add missing buckets. + for _, rate := range rates.m { + if _, ok := tbs.buckets[rate.period]; !ok { + newBucket := newTokenBucket(rate, tbs.clock) + tbs.buckets[rate.period] = newBucket + } + } + // Identify the maximum period in the set + tbs.maxPeriod = 0 + for _, bucket := range tbs.buckets { + tbs.maxPeriod = maxDuration(tbs.maxPeriod, bucket.period) + } +} + +func (tbs *TokenBucketSet) Consume(tokens int64) (time.Duration, error) { + var maxDelay time.Duration = UndefinedDelay + var firstErr error = nil + for _, tokenBucket := range tbs.buckets { + // We keep calling `Consume` even after a error is returned for one of + // buckets because that allows us to simplify the rollback procedure, + // that is to just call `Rollback` for all buckets. + delay, err := tokenBucket.consume(tokens) + if firstErr == nil { + if err != nil { + firstErr = err + } else { + maxDelay = maxDuration(maxDelay, delay) + } + } + } + // If we could not make ALL buckets consume tokens for whatever reason, + // then rollback consumption for all of them. + if firstErr != nil || maxDelay > 0 { + for _, tokenBucket := range tbs.buckets { + tokenBucket.rollback() + } + } + return maxDelay, firstErr +} + +func (tbs *TokenBucketSet) GetMaxPeriod() time.Duration { + return tbs.maxPeriod +} + +// debugState returns string that reflects the current state of all buckets in +// this set. It is intended to be used for debugging and testing only. +func (tbs *TokenBucketSet) debugState() string { + periods := sort.IntSlice(make([]int, 0, len(tbs.buckets))) + for period := range tbs.buckets { + periods = append(periods, int(period)) + } + sort.Sort(periods) + bucketRepr := make([]string, 0, len(tbs.buckets)) + for _, period := range periods { + bucket := tbs.buckets[time.Duration(period)] + bucketRepr = append(bucketRepr, fmt.Sprintf("{%v: %v}", bucket.period, bucket.availableTokens)) + } + return strings.Join(bucketRepr, ", ") +} + +func maxDuration(x time.Duration, y time.Duration) time.Duration { + if x > y { + return x + } + return y +} diff --git a/vendor/github.com/vulcand/oxy/ratelimit/tokenlimiter.go b/vendor/github.com/vulcand/oxy/ratelimit/tokenlimiter.go new file mode 100644 index 000000000..c621a6070 --- /dev/null +++ b/vendor/github.com/vulcand/oxy/ratelimit/tokenlimiter.go @@ -0,0 +1,248 @@ +// Tokenbucket based request rate limiter +package ratelimit + +import ( + "fmt" + "net/http" + "sync" + "time" + + "github.com/mailgun/timetools" + "github.com/mailgun/ttlmap" + "github.com/vulcand/oxy/utils" +) + +const DefaultCapacity = 65536 + +// RateSet maintains a set of rates. It can contain only one rate per period at a time. +type RateSet struct { + m map[time.Duration]*rate +} + +// NewRateSet crates an empty `RateSet` instance. +func NewRateSet() *RateSet { + rs := new(RateSet) + rs.m = make(map[time.Duration]*rate) + return rs +} + +// Add adds a rate to the set. If there is a rate with the same period in the +// set then the new rate overrides the old one. +func (rs *RateSet) Add(period time.Duration, average int64, burst int64) error { + if period <= 0 { + return fmt.Errorf("Invalid period: %v", period) + } + if average <= 0 { + return fmt.Errorf("Invalid average: %v", average) + } + if burst <= 0 { + return fmt.Errorf("Invalid burst: %v", burst) + } + rs.m[period] = &rate{period, average, burst} + return nil +} + +func (rs *RateSet) String() string { + return fmt.Sprint(rs.m) +} + +type RateExtractor interface { + Extract(r *http.Request) (*RateSet, error) +} + +type RateExtractorFunc func(r *http.Request) (*RateSet, error) + +func (e RateExtractorFunc) Extract(r *http.Request) (*RateSet, error) { + return e(r) +} + +// TokenLimiter implements rate limiting middleware. +type TokenLimiter struct { + defaultRates *RateSet + extract utils.SourceExtractor + extractRates RateExtractor + clock timetools.TimeProvider + mutex sync.Mutex + bucketSets *ttlmap.TtlMap + errHandler utils.ErrorHandler + log utils.Logger + capacity int + next http.Handler +} + +// New constructs a `TokenLimiter` middleware instance. +func New(next http.Handler, extract utils.SourceExtractor, defaultRates *RateSet, opts ...TokenLimiterOption) (*TokenLimiter, error) { + if defaultRates == nil || len(defaultRates.m) == 0 { + return nil, fmt.Errorf("Provide default rates") + } + if extract == nil { + return nil, fmt.Errorf("Provide extract function") + } + tl := &TokenLimiter{ + next: next, + defaultRates: defaultRates, + extract: extract, + } + + for _, o := range opts { + if err := o(tl); err != nil { + return nil, err + } + } + setDefaults(tl) + bucketSets, err := ttlmap.NewMapWithProvider(tl.capacity, tl.clock) + if err != nil { + return nil, err + } + tl.bucketSets = bucketSets + return tl, nil +} + +func (tl *TokenLimiter) Wrap(next http.Handler) { + tl.next = next +} + +func (tl *TokenLimiter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + source, amount, err := tl.extract.Extract(req) + if err != nil { + tl.errHandler.ServeHTTP(w, req, err) + return + } + + if err := tl.consumeRates(req, source, amount); err != nil { + tl.log.Infof("limiting request %v %v, limit: %v", req.Method, req.URL, err) + tl.errHandler.ServeHTTP(w, req, err) + return + } + + tl.next.ServeHTTP(w, req) +} + +func (tl *TokenLimiter) consumeRates(req *http.Request, source string, amount int64) error { + tl.mutex.Lock() + defer tl.mutex.Unlock() + + effectiveRates := tl.resolveRates(req) + bucketSetI, exists := tl.bucketSets.Get(source) + var bucketSet *TokenBucketSet + + if exists { + bucketSet = bucketSetI.(*TokenBucketSet) + bucketSet.Update(effectiveRates) + } else { + bucketSet = NewTokenBucketSet(effectiveRates, tl.clock) + // We set ttl as 10 times rate period. E.g. if rate is 100 requests/second per client ip + // the counters for this ip will expire after 10 seconds of inactivity + tl.bucketSets.Set(source, bucketSet, int(bucketSet.maxPeriod/time.Second)*10+1) + } + delay, err := bucketSet.Consume(amount) + if err != nil { + return err + } + if delay > 0 { + return &MaxRateError{delay: delay} + } + return nil +} + +// effectiveRates retrieves rates to be applied to the request. +func (tl *TokenLimiter) resolveRates(req *http.Request) *RateSet { + // If configuration mapper is not specified for this instance, then return + // the default bucket specs. + if tl.extractRates == nil { + return tl.defaultRates + } + + rates, err := tl.extractRates.Extract(req) + if err != nil { + tl.log.Errorf("Failed to retrieve rates: %v", err) + return tl.defaultRates + } + + // If the returned rate set is empty then used the default one. + if len(rates.m) == 0 { + return tl.defaultRates + } + + return rates +} + +type MaxRateError struct { + delay time.Duration +} + +func (m *MaxRateError) Error() string { + return fmt.Sprintf("max rate reached: retry-in %v", m.delay) +} + +type RateErrHandler struct { +} + +func (e *RateErrHandler) ServeHTTP(w http.ResponseWriter, req *http.Request, err error) { + if rerr, ok := err.(*MaxRateError); ok { + w.Header().Set("X-Retry-In", rerr.delay.String()) + w.WriteHeader(429) + w.Write([]byte(err.Error())) + return + } + utils.DefaultHandler.ServeHTTP(w, req, err) +} + +type TokenLimiterOption func(l *TokenLimiter) error + +// Logger sets the logger that will be used by this middleware. +func Logger(l utils.Logger) TokenLimiterOption { + return func(cl *TokenLimiter) error { + cl.log = l + return nil + } +} + +// ErrorHandler sets error handler of the server +func ErrorHandler(h utils.ErrorHandler) TokenLimiterOption { + return func(cl *TokenLimiter) error { + cl.errHandler = h + return nil + } +} + +func ExtractRates(e RateExtractor) TokenLimiterOption { + return func(cl *TokenLimiter) error { + cl.extractRates = e + return nil + } +} + +func Clock(clock timetools.TimeProvider) TokenLimiterOption { + return func(cl *TokenLimiter) error { + cl.clock = clock + return nil + } +} + +func Capacity(cap int) TokenLimiterOption { + return func(cl *TokenLimiter) error { + if cap <= 0 { + return fmt.Errorf("bad capacity: %v", cap) + } + cl.capacity = cap + return nil + } +} + +var defaultErrHandler = &RateErrHandler{} + +func setDefaults(tl *TokenLimiter) { + if tl.log == nil { + tl.log = utils.NullLogger + } + if tl.capacity <= 0 { + tl.capacity = DefaultCapacity + } + if tl.clock == nil { + tl.clock = &timetools.RealTime{} + } + if tl.errHandler == nil { + tl.errHandler = defaultErrHandler + } +}