mirror of
https://github.com/gusaul/grpcox.git
synced 2025-01-24 05:04:39 +00:00
Merge pull request #12 from gusaul/separate_reflection_conn
separate reflection descriptor from established conn
This commit is contained in:
commit
6d9d03f52a
|
@ -7,12 +7,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fullstorydev/grpcurl"
|
"github.com/fullstorydev/grpcurl"
|
||||||
"github.com/jhump/protoreflect/grpcreflect"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
"google.golang.org/grpc/metadata"
|
|
||||||
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// GrpCox - main object
|
// GrpCox - main object
|
||||||
|
@ -71,15 +68,12 @@ func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRe
|
||||||
var err error
|
var err error
|
||||||
r := new(Resource)
|
r := new(Resource)
|
||||||
h := append(g.headers, g.reflectHeaders...)
|
h := append(g.headers, g.reflectHeaders...)
|
||||||
md := grpcurl.MetadataFromHeaders(h)
|
r.md = grpcurl.MetadataFromHeaders(h)
|
||||||
refCtx := metadata.NewOutgoingContext(ctx, md)
|
|
||||||
r.clientConn, err = g.dial(ctx, target, plainText)
|
r.clientConn, err = g.dial(ctx, target, plainText)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(r.clientConn))
|
|
||||||
r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient)
|
|
||||||
r.headers = h
|
r.headers = h
|
||||||
|
|
||||||
g.activeConn.addConnection(target, r, g.maxLifeConn)
|
g.activeConn.addConnection(target, r, g.maxLifeConn)
|
||||||
|
|
|
@ -14,6 +14,8 @@ import (
|
||||||
"github.com/jhump/protoreflect/grpcreflect"
|
"github.com/jhump/protoreflect/grpcreflect"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resource - hold 3 main function (List, Describe, and Invoke)
|
// Resource - hold 3 main function (List, Describe, and Invoke)
|
||||||
|
@ -23,12 +25,42 @@ type Resource struct {
|
||||||
refClient *grpcreflect.Client
|
refClient *grpcreflect.Client
|
||||||
|
|
||||||
headers []string
|
headers []string
|
||||||
|
md metadata.MD
|
||||||
|
}
|
||||||
|
|
||||||
|
//openDescriptor - use it to reflect server descriptor
|
||||||
|
func (r *Resource) openDescriptor() {
|
||||||
|
ctx := context.Background()
|
||||||
|
refCtx := metadata.NewOutgoingContext(ctx, r.md)
|
||||||
|
r.refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(r.clientConn))
|
||||||
|
r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
//closeDescriptor - please ensure to always close after open in the same flow
|
||||||
|
func (r *Resource) closeDescriptor() {
|
||||||
|
done := make(chan int)
|
||||||
|
go func() {
|
||||||
|
if r.refClient != nil {
|
||||||
|
r.refClient.Reset()
|
||||||
|
}
|
||||||
|
done <- 1
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
log.Printf("Reflection %s falied to close\n", r.clientConn.Target())
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List - To list all services exposed by a server
|
// List - To list all services exposed by a server
|
||||||
// symbol can be "" to list all available services
|
// symbol can be "" to list all available services
|
||||||
// symbol also can be service name to list all available method
|
// symbol also can be service name to list all available method
|
||||||
func (r *Resource) List(symbol string) ([]string, error) {
|
func (r *Resource) List(symbol string) ([]string, error) {
|
||||||
|
r.openDescriptor()
|
||||||
|
defer r.closeDescriptor()
|
||||||
|
|
||||||
var result []string
|
var result []string
|
||||||
if symbol == "" {
|
if symbol == "" {
|
||||||
|
@ -65,6 +97,8 @@ func (r *Resource) List(symbol string) ([]string, error) {
|
||||||
// It also prints a description of that symbol, in the form of snippets of proto source.
|
// It also prints a description of that symbol, in the form of snippets of proto source.
|
||||||
// It won't necessarily be the original source that defined the element, but it will be equivalent.
|
// It won't necessarily be the original source that defined the element, but it will be equivalent.
|
||||||
func (r *Resource) Describe(symbol string) (string, string, error) {
|
func (r *Resource) Describe(symbol string) (string, string, error) {
|
||||||
|
r.openDescriptor()
|
||||||
|
defer r.closeDescriptor()
|
||||||
|
|
||||||
var result, template string
|
var result, template string
|
||||||
|
|
||||||
|
@ -119,6 +153,9 @@ func (r *Resource) Describe(symbol string) (string, string, error) {
|
||||||
|
|
||||||
// Invoke - invoking gRPC function
|
// Invoke - invoking gRPC function
|
||||||
func (r *Resource) Invoke(ctx context.Context, symbol string, in io.Reader) (string, time.Duration, error) {
|
func (r *Resource) Invoke(ctx context.Context, symbol string, in io.Reader) (string, time.Duration, error) {
|
||||||
|
r.openDescriptor()
|
||||||
|
defer r.closeDescriptor()
|
||||||
|
|
||||||
// because of grpcurl directly fmt.Printf on their invoke function
|
// because of grpcurl directly fmt.Printf on their invoke function
|
||||||
// so we stub the Stdout using os.Pipe
|
// so we stub the Stdout using os.Pipe
|
||||||
backUpStdout := os.Stdout
|
backUpStdout := os.Stdout
|
||||||
|
@ -167,10 +204,6 @@ func (r *Resource) Invoke(ctx context.Context, symbol string, in io.Reader) (str
|
||||||
func (r *Resource) Close() {
|
func (r *Resource) Close() {
|
||||||
done := make(chan int)
|
done := make(chan int)
|
||||||
go func() {
|
go func() {
|
||||||
if r.refClient != nil {
|
|
||||||
r.refClient.Reset()
|
|
||||||
r.refClient = nil
|
|
||||||
}
|
|
||||||
if r.clientConn != nil {
|
if r.clientConn != nil {
|
||||||
r.clientConn.Close()
|
r.clientConn.Close()
|
||||||
r.clientConn = nil
|
r.clientConn = nil
|
||||||
|
|
Loading…
Reference in New Issue
Block a user