1
0
mirror of https://github.com/gusaul/grpcox.git synced 2024-09-29 04:10:39 +00:00

add expiry connection and automatic close

This commit is contained in:
gusaul 2019-04-05 18:07:19 +07:00
parent 4a4ceb65c2
commit 5026c0a359
8 changed files with 208 additions and 18 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
grpcox
log

2
config.env Normal file
View File

@ -0,0 +1,2 @@
MAX_LIFE_CONN=10
TICK_CLOSE_CONN=3

143
core/connection.go Normal file
View File

@ -0,0 +1,143 @@
package core
import (
"log"
"sync"
"time"
)
// ConnStore - connection store instance
type ConnStore struct {
sync.RWMutex
conn map[string]*connection
// flag for auto garbage collector(close and cleanup expired connection)
activeGC bool
// controls gc intervals
gcTicker *time.Ticker
// gc stop signal
done chan struct{}
}
type connection struct {
// hold connection object
resource *Resource
// keep connect
keepAlive bool
// will automatically close connection
expired time.Time
}
// NewConnectionStore - constructor connection store
func NewConnectionStore() *ConnStore {
return &ConnStore{
conn: make(map[string]*connection),
}
}
// StartGC - start gc ticker
func (c *ConnStore) StartGC(interval time.Duration) {
if interval <= 0 {
return
}
c.activeGC = true
ticker := time.NewTicker(interval)
done := make(chan struct{})
c.Lock()
c.gcTicker = ticker
c.done = done
c.Unlock()
go func() {
for {
select {
case <-ticker.C:
c.Lock()
for key := range c.conn {
if c.isExpired(key) {
c.delete(key)
log.Printf("Connection %s expired and closed\n", key)
}
}
c.Unlock()
case <-done:
return
}
}
}()
}
// StopGC stops sweeping goroutine.
func (c *ConnStore) StopGC() {
if c.activeGC {
c.Lock()
c.gcTicker.Stop()
c.gcTicker = nil
close(c.done)
c.done = nil
c.Unlock()
}
}
func (c *ConnStore) extend(key string, ttl time.Duration) {
conn := c.conn[key]
if conn != nil {
conn.extendConnection(ttl)
}
}
func (c *ConnStore) isExpired(key string) bool {
conn := c.conn[key]
if conn == nil {
return false
}
return !conn.keepAlive && conn.expired.Before(time.Now())
}
func (c *ConnStore) getAllConn() map[string]*connection {
return c.conn
}
func (c *ConnStore) delete(key string) {
conn := c.conn[key]
if conn != nil {
conn.close()
delete(c.conn, key)
}
}
func (c *ConnStore) addConnection(host string, res *Resource, ttl ...time.Duration) {
conn := &connection{
resource: res,
keepAlive: true,
}
if len(ttl) > 0 {
conn.keepAlive = false
conn.expired = time.Now().Add(ttl[0])
}
c.conn[host] = conn
}
func (c *ConnStore) getConnection(host string) (res *Resource, found bool) {
conn, ok := c.conn[host]
if ok && conn.resource != nil {
found = true
res = conn.resource
}
return
}
func (conn *connection) extendConnection(ttl time.Duration) {
conn.keepAlive = false
conn.expired = time.Now().Add(ttl)
}
func (conn *connection) close() {
conn.resource.Close()
}

View File

@ -2,6 +2,8 @@ package core
import (
"context"
"os"
"strconv"
"time"
"github.com/fullstorydev/grpcurl"
@ -17,7 +19,8 @@ import (
type GrpCox struct {
KeepAlive float64
activeConn map[string]*Resource
activeConn *ConnStore
maxLifeConn time.Duration
// TODO : utilize below args
headers []string
@ -33,15 +36,33 @@ type GrpCox struct {
// InitGrpCox constructor
func InitGrpCox() *GrpCox {
return &GrpCox{
activeConn: make(map[string]*Resource),
maxLife, tick := 10, 3
if val, err := strconv.Atoi(os.Getenv("MAX_LIFE_CONN")); err == nil {
maxLife = val
}
if val, err := strconv.Atoi(os.Getenv("TICK_CLOSE_CONN")); err == nil {
tick = val
}
c := NewConnectionStore()
g := &GrpCox{
activeConn: c,
}
if maxLife > 0 && tick > 0 {
g.maxLifeConn = time.Duration(maxLife) * time.Minute
c.StartGC(time.Duration(tick) * time.Second)
}
return g
}
// GetResource - open resource to targeted grpc server
func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRestartConn bool) (*Resource, error) {
if conn, ok := g.activeConn[target]; ok {
if !isRestartConn && conn.refClient != nil && conn.clientConn != nil {
if conn, ok := g.activeConn.getConnection(target); ok {
if !isRestartConn && conn.isValid() {
return conn, nil
}
g.CloseActiveConns(target)
@ -61,15 +82,16 @@ func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRe
r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient)
r.headers = h
g.activeConn[target] = r
g.activeConn.addConnection(target, r, g.maxLifeConn)
return r, nil
}
// GetActiveConns - get all saved active connection
func (g *GrpCox) GetActiveConns(ctx context.Context) []string {
result := make([]string, len(g.activeConn))
active := g.activeConn.getAllConn()
result := make([]string, len(active))
i := 0
for k := range g.activeConn {
for k := range active {
result[i] = k
i++
}
@ -79,19 +101,19 @@ func (g *GrpCox) GetActiveConns(ctx context.Context) []string {
// CloseActiveConns - close conn by host or all
func (g *GrpCox) CloseActiveConns(host string) error {
if host == "all" {
for k, v := range g.activeConn {
v.Close()
delete(g.activeConn, k)
for k := range g.activeConn.getAllConn() {
g.activeConn.delete(k)
}
return nil
}
if v, ok := g.activeConn[host]; ok {
v.Close()
delete(g.activeConn, host)
g.activeConn.delete(host)
return nil
}
return nil
// Extend extend connection based on setting max life
func (g *GrpCox) Extend(host string) {
g.activeConn.extend(host, g.maxLifeConn)
}
func (g *GrpCox) dial(ctx context.Context, target string, plainText bool) (*grpc.ClientConn, error) {

View File

@ -182,10 +182,15 @@ func (r *Resource) Close() {
case <-done:
return
case <-time.After(3 * time.Second):
log.Printf("Connection %s falied to close\n", r.clientConn.Target())
return
}
}
func (r *Resource) isValid() bool {
return r.refClient != nil && r.clientConn != nil
}
func (r *Resource) exit(code int) {
// to force reset before os exit
r.Close()

View File

@ -6,3 +6,6 @@ services:
- "6969:6969"
volumes:
- ./index:/index
- ./log:/log
env_file:
- config.env

View File

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/gorilla/mux"
@ -11,6 +12,16 @@ import (
)
func main() {
// logging conf
f, err := os.OpenFile("log/grpcox.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
log.SetFlags(log.LstdFlags | log.Lshortfile)
// start app
port := ":6969"
muxRouter := mux.NewRouter()
handler.Init(muxRouter)

View File

@ -81,6 +81,7 @@ func (h *Handler) getLists(w http.ResponseWriter, r *http.Request) {
return
}
h.g.Extend(host)
response(w, result)
}
@ -130,6 +131,7 @@ func (h *Handler) describeFunction(w http.ResponseWriter, r *http.Request) {
Template string `json:"template"`
}
h.g.Extend(host)
response(w, desc{
Schema: result,
Template: template,
@ -171,6 +173,7 @@ func (h *Handler) invokeFunction(w http.ResponseWriter, r *http.Request) {
Result string `json:"result"`
}
h.g.Extend(host)
response(w, invRes{
Time: timer.String(),
Result: result,