1
0
mirror of https://github.com/gusaul/grpcox.git synced 2025-01-24 05:04:39 +00:00
grpcox/core/grpcox.go
2019-03-13 10:38:30 +07:00

139 lines
3.2 KiB
Go

package core
import (
"context"
"time"
"github.com/fullstorydev/grpcurl"
"github.com/jhump/protoreflect/grpcreflect"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
// GrpCox - main object
type GrpCox struct {
KeepAlive float64
activeConn map[string]*Resource
// TODO : utilize below args
headers []string
reflectHeaders []string
authority string
insecure bool
cacert string
cert string
key string
serverName string
isUnixSocket func() bool
}
// InitGrpCox constructor
func InitGrpCox() *GrpCox {
return &GrpCox{
activeConn: make(map[string]*Resource),
}
}
// GetResource - open resource to targeted grpc server
func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRestartConn bool) (*Resource, error) {
if conn, ok := g.activeConn[target]; ok {
if !isRestartConn && conn.refClient != nil && conn.clientConn != nil {
return conn, nil
}
g.CloseActiveConns(target)
}
var err error
r := new(Resource)
h := append(g.headers, g.reflectHeaders...)
md := grpcurl.MetadataFromHeaders(h)
refCtx := metadata.NewOutgoingContext(ctx, md)
r.clientConn, err = g.dial(ctx, target, plainText)
if err != nil {
return nil, err
}
r.refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(r.clientConn))
r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient)
r.headers = h
g.activeConn[target] = r
return r, nil
}
// GetActiveConns - get all saved active connection
func (g *GrpCox) GetActiveConns(ctx context.Context) []string {
result := make([]string, len(g.activeConn))
i := 0
for k := range g.activeConn {
result[i] = k
i++
}
return result
}
// CloseActiveConns - close conn by host or all
func (g *GrpCox) CloseActiveConns(host string) error {
if host == "all" {
for k, v := range g.activeConn {
v.Close()
delete(g.activeConn, k)
}
return nil
}
if v, ok := g.activeConn[host]; ok {
v.Close()
delete(g.activeConn, host)
}
return nil
}
func (g *GrpCox) dial(ctx context.Context, target string, plainText bool) (*grpc.ClientConn, error) {
dialTime := 10 * time.Second
ctx, cancel := context.WithTimeout(ctx, dialTime)
defer cancel()
var opts []grpc.DialOption
// keep alive
if g.KeepAlive > 0 {
timeout := time.Duration(g.KeepAlive * float64(time.Second))
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: timeout,
Timeout: timeout,
}))
}
if g.authority != "" {
opts = append(opts, grpc.WithAuthority(g.authority))
}
var creds credentials.TransportCredentials
if !plainText {
var err error
creds, err = grpcurl.ClientTransportCredentials(g.insecure, g.cacert, g.cert, g.key)
if err != nil {
return nil, err
}
if g.serverName != "" {
if err := creds.OverrideServerName(g.serverName); err != nil {
return nil, err
}
}
}
network := "tcp"
if g.isUnixSocket != nil && g.isUnixSocket() {
network = "unix"
}
cc, err := grpcurl.BlockingDial(ctx, network, target, creds, opts...)
if err != nil {
return nil, err
}
return cc, nil
}