diff --git a/core/grpcox.go b/core/grpcox.go index 624c410..7c18b95 100644 --- a/core/grpcox.go +++ b/core/grpcox.go @@ -7,12 +7,9 @@ import ( "time" "github.com/fullstorydev/grpcurl" - "github.com/jhump/protoreflect/grpcreflect" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" - reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" ) // GrpCox - main object @@ -71,15 +68,12 @@ func (g *GrpCox) GetResource(ctx context.Context, target string, plainText, isRe var err error r := new(Resource) h := append(g.headers, g.reflectHeaders...) - md := grpcurl.MetadataFromHeaders(h) - refCtx := metadata.NewOutgoingContext(ctx, md) + r.md = grpcurl.MetadataFromHeaders(h) r.clientConn, err = g.dial(ctx, target, plainText) if err != nil { return nil, err } - r.refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(r.clientConn)) - r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient) r.headers = h g.activeConn.addConnection(target, r, g.maxLifeConn) diff --git a/core/resource.go b/core/resource.go index 4b0161b..cc8be52 100644 --- a/core/resource.go +++ b/core/resource.go @@ -14,6 +14,8 @@ import ( "github.com/jhump/protoreflect/grpcreflect" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" ) // Resource - hold 3 main function (List, Describe, and Invoke) @@ -23,12 +25,42 @@ type Resource struct { refClient *grpcreflect.Client headers []string + md metadata.MD +} + +//openDescriptor - use it to reflect server descriptor +func (r *Resource) openDescriptor() { + ctx := context.Background() + refCtx := metadata.NewOutgoingContext(ctx, r.md) + r.refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(r.clientConn)) + r.descSource = grpcurl.DescriptorSourceFromServer(ctx, r.refClient) +} + +//closeDescriptor - please ensure to always close after open in the same flow +func (r *Resource) closeDescriptor() { + done := make(chan int) + go func() { + if r.refClient != nil { + r.refClient.Reset() + } + done <- 1 + }() + + select { + case <-done: + return + case <-time.After(3 * time.Second): + log.Printf("Reflection %s falied to close\n", r.clientConn.Target()) + return + } } // List - To list all services exposed by a server // symbol can be "" to list all available services // symbol also can be service name to list all available method func (r *Resource) List(symbol string) ([]string, error) { + r.openDescriptor() + defer r.closeDescriptor() var result []string if symbol == "" { @@ -65,6 +97,8 @@ func (r *Resource) List(symbol string) ([]string, error) { // It also prints a description of that symbol, in the form of snippets of proto source. // It won't necessarily be the original source that defined the element, but it will be equivalent. func (r *Resource) Describe(symbol string) (string, string, error) { + r.openDescriptor() + defer r.closeDescriptor() var result, template string @@ -119,6 +153,9 @@ func (r *Resource) Describe(symbol string) (string, string, error) { // Invoke - invoking gRPC function func (r *Resource) Invoke(ctx context.Context, symbol string, in io.Reader) (string, time.Duration, error) { + r.openDescriptor() + defer r.closeDescriptor() + // because of grpcurl directly fmt.Printf on their invoke function // so we stub the Stdout using os.Pipe backUpStdout := os.Stdout @@ -167,10 +204,6 @@ func (r *Resource) Invoke(ctx context.Context, symbol string, in io.Reader) (str func (r *Resource) Close() { done := make(chan int) go func() { - if r.refClient != nil { - r.refClient.Reset() - r.refClient = nil - } if r.clientConn != nil { r.clientConn.Close() r.clientConn = nil