From 5026c0a359914617e685f7e33e13b3098b4019cc Mon Sep 17 00:00:00 2001 From: gusaul Date: Fri, 5 Apr 2019 18:07:19 +0700 Subject: [PATCH] add expiry connection and automatic close --- .gitignore | 3 +- config.env | 2 + core/connection.go | 143 +++++++++++++++++++++++++++++++++++++++++++++ core/grpcox.go | 54 ++++++++++++----- core/resource.go | 5 ++ docker-compose.yml | 5 +- grpcox.go | 11 ++++ handler/handler.go | 3 + 8 files changed, 208 insertions(+), 18 deletions(-) create mode 100644 config.env create mode 100644 core/connection.go diff --git a/.gitignore b/.gitignore index dd3f720..918573f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -grpcox \ No newline at end of file +grpcox +log \ No newline at end of file diff --git a/config.env b/config.env new file mode 100644 index 0000000..e5e1ea0 --- /dev/null +++ b/config.env @@ -0,0 +1,2 @@ +MAX_LIFE_CONN=10 +TICK_CLOSE_CONN=3 \ No newline at end of file diff --git a/core/connection.go b/core/connection.go new file mode 100644 index 0000000..f12a9d7 --- /dev/null +++ b/core/connection.go @@ -0,0 +1,143 @@ +package core + +import ( + "log" + "sync" + "time" +) + +// ConnStore - connection store instance +type ConnStore struct { + sync.RWMutex + + conn map[string]*connection + + // flag for auto garbage collector(close and cleanup expired connection) + activeGC bool + // controls gc intervals + gcTicker *time.Ticker + // gc stop signal + done chan struct{} +} + +type connection struct { + // hold connection object + resource *Resource + // keep connect + keepAlive bool + // will automatically close connection + expired time.Time +} + +// NewConnectionStore - constructor connection store +func NewConnectionStore() *ConnStore { + return &ConnStore{ + conn: make(map[string]*connection), + } +} + +// StartGC - start gc ticker +func (c *ConnStore) StartGC(interval time.Duration) { + if interval <= 0 { + return + } + + c.activeGC = true + + ticker := time.NewTicker(interval) + done := make(chan struct{}) + + c.Lock() + c.gcTicker = ticker + c.done = done + c.Unlock() + + go func() { + for { + select { + case <-ticker.C: + c.Lock() + for key := range c.conn { + if c.isExpired(key) { + c.delete(key) + log.Printf("Connection %s expired and closed\n", key) + } + } + c.Unlock() + case <-done: + return + } + } + }() +} + +// StopGC stops sweeping goroutine. +func (c *ConnStore) StopGC() { + if c.activeGC { + c.Lock() + c.gcTicker.Stop() + c.gcTicker = nil + close(c.done) + c.done = nil + c.Unlock() + } +} + +func (c *ConnStore) extend(key string, ttl time.Duration) { + conn := c.conn[key] + if conn != nil { + conn.extendConnection(ttl) + } +} + +func (c *ConnStore) isExpired(key string) bool { + conn := c.conn[key] + if conn == nil { + return false + } + return !conn.keepAlive && conn.expired.Before(time.Now()) +} + +func (c *ConnStore) getAllConn() map[string]*connection { + return c.conn +} + +func (c *ConnStore) delete(key string) { + conn := c.conn[key] + if conn != nil { + conn.close() + delete(c.conn, key) + } +} + +func (c *ConnStore) addConnection(host string, res *Resource, ttl ...time.Duration) { + conn := &connection{ + resource: res, + keepAlive: true, + } + + if len(ttl) > 0 { + conn.keepAlive = false + conn.expired = time.Now().Add(ttl[0]) + } + + c.conn[host] = conn +} + +func (c *ConnStore) getConnection(host string) (res *Resource, found bool) { + conn, ok := c.conn[host] + if ok && conn.resource != nil { + found = true + res = conn.resource + } + return +} + +func (conn *connection) extendConnection(ttl time.Duration) { + conn.keepAlive = false + conn.expired = time.Now().Add(ttl) +} + +func (conn *connection) close() { + conn.resource.Close() +} diff --git a/core/grpcox.go b/core/grpcox.go index ee09591..624c410 100644 --- a/core/grpcox.go +++ b/core/grpcox.go @@ -2,6 +2,8 @@ package core import ( "context" + "os" + "strconv" "time" "github.com/fullstorydev/grpcurl" @@ -17,7 +19,8 @@ import ( type GrpCox struct { KeepAlive float64 - activeConn map[string]*Resource + activeConn *ConnStore + maxLifeConn time.Duration // TODO : utilize below args headers []string @@ -33,15 +36,33 @@ type GrpCox struct { // InitGrpCox constructor func InitGrpCox() *GrpCox { - return &GrpCox{ - activeConn: make(map[string]*Resource), + maxLife, tick := 10, 3 + + if val, err := strconv.Atoi(os.Getenv("MAX_LIFE_CONN")); err == nil { + maxLife = val } + + if val, err := strconv.Atoi(os.Getenv("TICK_CLOSE_CONN")); err == nil { + tick = val + } + + c := NewConnectionStore() + g := &GrpCox{ + activeConn: c, + } + + if maxLife > 0 && tick > 0 { + g.maxLifeConn = time.Duration(maxLife) * time.Minute + c.StartGC(time.Duration(tick) * time.Second) + } + + return g } // GetResource - open resource to targeted grpc server func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRestartConn bool) (*Resource, error) { - if conn, ok := g.activeConn[target]; ok { - if !isRestartConn && conn.refClient != nil && conn.clientConn != nil { + if conn, ok := g.activeConn.getConnection(target); ok { + if !isRestartConn && conn.isValid() { return conn, nil } g.CloseActiveConns(target) @@ -61,15 +82,16 @@ func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRe r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient) r.headers = h - g.activeConn[target] = r + g.activeConn.addConnection(target, r, g.maxLifeConn) return r, nil } // GetActiveConns - get all saved active connection func (g *GrpCox) GetActiveConns(ctx context.Context) []string { - result := make([]string, len(g.activeConn)) + active := g.activeConn.getAllConn() + result := make([]string, len(active)) i := 0 - for k := range g.activeConn { + for k := range active { result[i] = k i++ } @@ -79,21 +101,21 @@ func (g *GrpCox) GetActiveConns(ctx context.Context) []string { // CloseActiveConns - close conn by host or all func (g *GrpCox) CloseActiveConns(host string) error { if host == "all" { - for k, v := range g.activeConn { - v.Close() - delete(g.activeConn, k) + for k := range g.activeConn.getAllConn() { + g.activeConn.delete(k) } return nil } - if v, ok := g.activeConn[host]; ok { - v.Close() - delete(g.activeConn, host) - } - + g.activeConn.delete(host) return nil } +// Extend extend connection based on setting max life +func (g *GrpCox) Extend(host string) { + g.activeConn.extend(host, g.maxLifeConn) +} + func (g *GrpCox) dial(ctx context.Context, target string, plainText bool) (*grpc.ClientConn, error) { dialTime := 10 * time.Second ctx, cancel := context.WithTimeout(ctx, dialTime) diff --git a/core/resource.go b/core/resource.go index c121b48..4b0161b 100644 --- a/core/resource.go +++ b/core/resource.go @@ -182,10 +182,15 @@ func (r *Resource) Close() { case <-done: return case <-time.After(3 * time.Second): + log.Printf("Connection %s falied to close\n", r.clientConn.Target()) return } } +func (r *Resource) isValid() bool { + return r.refClient != nil && r.clientConn != nil +} + func (r *Resource) exit(code int) { // to force reset before os exit r.Close() diff --git a/docker-compose.yml b/docker-compose.yml index 97f8fbe..52d6f03 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,4 +5,7 @@ services: ports: - "6969:6969" volumes: - - ./index:/index \ No newline at end of file + - ./index:/index + - ./log:/log + env_file: + - config.env \ No newline at end of file diff --git a/grpcox.go b/grpcox.go index 41aef99..f2fe76f 100644 --- a/grpcox.go +++ b/grpcox.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "net/http" + "os" "time" "github.com/gorilla/mux" @@ -11,6 +12,16 @@ import ( ) func main() { + // logging conf + f, err := os.OpenFile("log/grpcox.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("error opening file: %v", err) + } + defer f.Close() + log.SetOutput(f) + log.SetFlags(log.LstdFlags | log.Lshortfile) + + // start app port := ":6969" muxRouter := mux.NewRouter() handler.Init(muxRouter) diff --git a/handler/handler.go b/handler/handler.go index 4e281f5..9ec5994 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -81,6 +81,7 @@ func (h *Handler) getLists(w http.ResponseWriter, r *http.Request) { return } + h.g.Extend(host) response(w, result) } @@ -130,6 +131,7 @@ func (h *Handler) describeFunction(w http.ResponseWriter, r *http.Request) { Template string `json:"template"` } + h.g.Extend(host) response(w, desc{ Schema: result, Template: template, @@ -171,6 +173,7 @@ func (h *Handler) invokeFunction(w http.ResponseWriter, r *http.Request) { Result string `json:"result"` } + h.g.Extend(host) response(w, invRes{ Time: timer.String(), Result: result,