Skip to content

Commit

Permalink
add settings to ensure higher system resources if possible (#48)
Browse files Browse the repository at this point in the history
also avoid racy usage of rand.New(), rand object is not
concurrency friendly with default NewSource() - to avoid this
seed the rand once and use global rand.Intn()
  • Loading branch information
harshavardhana authored Nov 17, 2020
1 parent e686251 commit ce2ac61
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 52 deletions.
8 changes: 4 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func parseCacheControlHeaders(header http.Header) *cacheControl {
}
}
cc, ok := header[CacheControl]
if !ok && c.expires.Equal(timeZero) {
if !ok && c.expires.IsZero() {
return nil
}
v := strings.Join(cc, "")
Expand Down Expand Up @@ -262,7 +262,7 @@ func (c *cacheControl) isStale(modTime time.Time) bool {
return true
}

if !c.expires.Equal(timeZero) && c.expires.Before(time.Now().Add(time.Duration(c.maxStale))) {
if !c.expires.IsZero() && c.expires.Before(time.Now().Add(time.Duration(c.maxStale))) {
return true
}

Expand Down Expand Up @@ -338,7 +338,7 @@ func (c cacheHeader) Expires() time.Time {
return t.UTC()
}
}
return timeZero
return time.Time{}
}

//ETag returns ETag from cached response
Expand All @@ -353,7 +353,7 @@ func (c cacheHeader) LastModified() time.Time {
return t.UTC()
}
}
return timeZero
return time.Time{}
}

const (
Expand Down
97 changes: 49 additions & 48 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ var (
globalConsoleDisplay bool
globalConnStats []*ConnStats
globalDNSCache *xhttp.DNSCache
rng *rand.Rand

timeZero = time.Time{}

// Create a new instance of the logger. You can have any number of instances.
log = logrus.New()
log *logrus.Logger
)

const (
Expand All @@ -72,7 +67,9 @@ const (

func init() {
globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second)
rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))

// Create a new instance of the logger. You can have any number of instances.
log = logrus.New()
}

func logMsg(msg logMessage) error {
Expand Down Expand Up @@ -144,29 +141,32 @@ type Backend struct {
healthCheckPort int
healthCheckDuration int
Stats *BackendStats
DowntimeStart time.Time
cacheClient *S3CacheClient
}

const (
offline = iota
online
)

func (b *Backend) setOffline() {
atomic.StoreInt32(&b.up, 0)
atomic.StoreInt32(&b.up, offline)
}

func (b *Backend) setOnline() {
atomic.StoreInt32(&b.up, 1)
atomic.StoreInt32(&b.up, online)
}

// IsUp returns true if backend is up
func (b *Backend) IsUp() bool {
return atomic.LoadInt32(&b.up) == 1
// Online returns true if backend is up
func (b *Backend) Online() bool {
return atomic.LoadInt32(&b.up) == online
}

func (b *Backend) getServerStatus() string {
status := "DOWN"
if b.IsUp() {
status = "UP"
if b.Online() {
return "UP"
}
return status
return "DOWN"
}

// BackendStats holds server stats for backend
Expand Down Expand Up @@ -259,33 +259,30 @@ func (b *Backend) healthCheck() {
resp.Body.Close()
}
if err != nil || (err == nil && resp.StatusCode != http.StatusOK) {
b.httpClient.CloseIdleConnections()
if globalLoggingEnabled && (!b.IsUp() || (b.Stats.UpSince.Equal(timeZero))) {
if globalLoggingEnabled && (!b.Online() || b.Stats.UpSince.IsZero()) {
logMsg(logMessage{Endpoint: b.endpoint, Status: "down", Error: err})
}
if b.IsUp() {
// observed an error, take the backend down.
b.setOffline()
}
if b.Stats.DowntimeStart.Equal(timeZero) {
// observed an error, take the backend down.
b.setOffline()
if b.Stats.DowntimeStart.IsZero() {
b.Stats.DowntimeStart = time.Now().UTC()
}
} else {
var downtimeEnd time.Time
if !b.Stats.DowntimeStart.Equal(timeZero) {
if !b.Stats.DowntimeStart.IsZero() {
now := time.Now().UTC()
b.updateDowntime(now.Sub(b.Stats.DowntimeStart))
downtimeEnd = now
}
if globalLoggingEnabled && !b.IsUp() && !b.Stats.UpSince.Equal(timeZero) {
if globalLoggingEnabled && !b.Online() && !b.Stats.UpSince.IsZero() {
logMsg(logMessage{
Endpoint: b.endpoint,
Status: "up",
DowntimeDuration: downtimeEnd.Sub(b.Stats.DowntimeStart),
})
}
b.Stats.UpSince = time.Now()
b.Stats.DowntimeStart = timeZero
b.Stats.UpSince = time.Now().UTC()
b.Stats.DowntimeStart = time.Time{}
b.setOnline()
}
if globalTraceEnabled {
Expand Down Expand Up @@ -339,9 +336,9 @@ type multisite struct {
}

func (m *multisite) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", os.Args[0]+pkg.ReleaseTag) // indicate sidekick is serving the request
w.Header().Set("Server", "SideKick/"+pkg.ReleaseTag) // indicate sidekick is serving the request
for _, s := range m.sites {
if s.IsUp() {
if s.Online() {
s.ServeHTTP(w, r)
return
}
Expand All @@ -353,9 +350,9 @@ type site struct {
backends []*Backend
}

func (s *site) IsUp() bool {
func (s *site) Online() bool {
for _, backend := range s.backends {
if backend.IsUp() {
if backend.Online() {
return true
}
}
Expand All @@ -365,7 +362,7 @@ func (s *site) IsUp() bool {
func (s *site) upBackends() []*Backend {
var backends []*Backend
for _, backend := range s.backends {
if backend.IsUp() {
if backend.Online() {
backends = append(backends, backend)
}
}
Expand All @@ -379,29 +376,27 @@ func (s *site) nextProxy() *Backend {
return nil
}

idx := rng.Intn(len(backends))
idx := rand.Intn(len(backends))
// random backend from a list of available backends.
return backends[idx]
}

// ServeHTTP - LoadBalancer implements http.Handler
func (s *site) ServeHTTP(w http.ResponseWriter, r *http.Request) {
backend := s.nextProxy()
if backend == nil {
w.WriteHeader(http.StatusBadGateway)
return
}

cacheHandlerFn := func(w http.ResponseWriter, r *http.Request) {
if backend.cacheClient != nil {
cacheHandler(w, r, backend)(w, r)
} else {
backend.proxy.ServeHTTP(w, r)
if backend != nil {
cacheHandlerFn := func(w http.ResponseWriter, r *http.Request) {
if backend.cacheClient != nil {
cacheHandler(w, r, backend)(w, r)
} else {
backend.proxy.ServeHTTP(w, r)
}
}
}

httpTraceHdrs(cacheHandlerFn, w, r, backend)

httpTraceHdrs(cacheHandlerFn, w, r, backend)
return
}
w.WriteHeader(http.StatusBadGateway)
}

// mustGetSystemCertPool - return system CAs or empty pool in case of error (or windows)
Expand Down Expand Up @@ -571,7 +566,7 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck
stats := BackendStats{MinLatency: time.Duration(24 * time.Hour), MaxLatency: time.Duration(0)}
backend := &Backend{siteNum, endpoint, proxy, &http.Client{
Transport: proxy.Transport,
}, 0, healthCheckPath, healthCheckPort, healthCheckDuration, &stats, timeZero, newCacheClient(ctx, cacheCfg)}
}, 0, healthCheckPath, healthCheckPort, healthCheckDuration, &stats, newCacheClient(ctx, cacheCfg)}
go backend.healthCheck()
proxy.ErrorHandler = backend.ErrorHandler
backends = append(backends, backend)
Expand Down Expand Up @@ -639,6 +634,12 @@ func sidekickMain(ctx *cli.Context) {
}

func main() {
// Set-up rand seed and use global rand to avoid concurrency issues.
rand.Seed(time.Now().UTC().UnixNano())

// Set system resources to maximum.
setMaxResources()

app := cli.NewApp()
app.Name = os.Args[0]
app.Author = "MinIO, Inc."
Expand Down
53 changes: 53 additions & 0 deletions rlimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.package main

package main

import (
"runtime/debug"

"github.com/minio/minio/pkg/sys"
)

func setMaxResources() (err error) {
// Set the Go runtime max threads threshold to 90% of kernel setting.
sysMaxThreads, mErr := sys.GetMaxThreads()
if mErr == nil {
minioMaxThreads := (sysMaxThreads * 90) / 100
// Only set max threads if it is greater than the default one
if minioMaxThreads > 10000 {
debug.SetMaxThreads(minioMaxThreads)
}
}

var maxLimit uint64

// Set open files limit to maximum.
if _, maxLimit, err = sys.GetMaxOpenFileLimit(); err != nil {
return err
}

if err = sys.SetMaxOpenFileLimit(maxLimit, maxLimit); err != nil {
return err
}

// Set max memory limit as current memory limit.
if _, maxLimit, err = sys.GetMaxMemoryLimit(); err != nil {
return err
}

err = sys.SetMaxMemoryLimit(maxLimit, maxLimit)
return err
}

0 comments on commit ce2ac61

Please sign in to comment.