Skip to content

Commit

Permalink
zapify
Browse files Browse the repository at this point in the history
  • Loading branch information
skandragon committed Sep 9, 2022
1 parent f656b08 commit 181a024
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions app/forwarder-controller/grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/x509"
"fmt"
"io"
"log"
"net"
"sync/atomic"

Expand All @@ -35,6 +34,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)

func (s *agentTunnelServer) sendWebhook(state tunnelroute.Route, endpoints []*tunnel.EndpointHealth) {
Expand Down Expand Up @@ -74,10 +74,10 @@ func handleHTTPRequests(session string, requestChan chan interface{}, httpids *u
Event: tunnel.MakeHTTPTunnelOpenTunnelRequest(value.Cmd),
}
if err := stream.Send(resp); err != nil {
log.Printf("Unable to send to route %s for HTTP request %s", session, value.Cmd.Id)
zap.S().Warnw("unable to send HTTP request over GRPC", "session", session, "requestId", value.Cmd.Id, "error", err)
}
default:
log.Printf("Got unexpected message type: %T", interfacedRequest)
zap.S().Warnw("unexpected message", "messageType", fmt.Sprintf("%T", interfacedRequest))
}
}
}
Expand All @@ -89,16 +89,16 @@ func handleHTTPCancelRequest(session string, cancelChan chan string, httpids *ut
Event: tunnel.MakeHTTPTunnelCancelRequest(id),
}
if err := stream.Send(resp); err != nil {
log.Printf("Unable to send to route %s for cancel request %s", session, id)
zap.S().Warnw("stream.Send() failed", "session", session, "requestId", id, "error", err)
}
}
log.Printf("cancel channel closed for route %s", session)
zap.S().Infow("session closed", "session", session)
}

func dataflowHandler(dataflow chan *tunnel.MessageWrapper, stream tunnel.GRPCEventStream) {
for ew := range dataflow {
if err := stream.Send(ew); err != nil {
log.Fatalf("Unable to respond over GRPC: %v", err)
zap.S().Errorw("stream.Send() failed", "error", err)
}
}
}
Expand Down Expand Up @@ -133,7 +133,11 @@ func (s *agentTunnelServer) EventTunnel(stream tunnel.AgentTunnelService_EventTu
ConnectedAt: tunnel.Now(),
}

log.Printf("Route %s connected, awaiting hello message", state)
remote := "unknown"
if p, ok := peer.FromContext(stream.Context()); ok {
remote = p.Addr.String()
}
zap.S().Infow("agent-connect", "route", state.String(), "remote-address", remote)

go handleHTTPRequests(sessionIdentity, inRequest, httpids, stream)

Expand All @@ -142,13 +146,13 @@ func (s *agentTunnelServer) EventTunnel(stream tunnel.AgentTunnelService_EventTu
for {
in, err := stream.Recv()
if err == io.EOF {
log.Printf("Closing %s", state)
zap.S().Infow("EOF", "route", state.String())
httpids.CloseAll()
routes.Remove(state)
return nil
}
if err != nil {
log.Printf("Agent closed connection: %s", state)
zap.S().Infow("remote-closed", "route", state.String())
httpids.CloseAll()
routes.Remove(state)
return err
Expand All @@ -159,7 +163,7 @@ func (s *agentTunnelServer) EventTunnel(stream tunnel.AgentTunnelService_EventTu
req := in.GetPingRequest()
atomic.StoreUint64(&state.LastPing, tunnel.Now())
if err := stream.Send(tunnel.MakePingResponse(req)); err != nil {
log.Printf("Unable to respond to %s with ping response: %v", state, err)
zap.S().Warnw("unable to respond to agent ping", "route", state.String(), "error", err)
routes.Remove(state)
return err
}
Expand All @@ -179,17 +183,17 @@ func (s *agentTunnelServer) EventTunnel(stream tunnel.AgentTunnelService_EventTu
s.sendWebhook(state, req.Endpoints)

if err = s.sendHello(stream); err != nil {
log.Printf("Unable to send hello packet, closing connection")
zap.S().Warnw("unable to responsd with hello, closing", "route", state.String(), "error", err)
routes.Remove(state)
return err
}
log.Printf("Route %s fully connected.", state)
zap.S().Infow("agent-handshake-complete", "route", state.String())
case *tunnel.MessageWrapper_HttpTunnelControl:
handleHTTPControl(in, httpids, s.endpoints, dataflow)
case nil:
// ignore for now
default:
log.Printf("Received unknown message: %s: %T", state, x)
zap.S().Debugw("received unknown message", "route", state.String(), "message", fmt.Sprintf("%#v", x))
}
}
}
Expand Down Expand Up @@ -298,10 +302,10 @@ type agentTunnelServer struct {
}

func runAgentGRPCServer(insecureAgents bool, serverCert tls.Certificate) {
log.Printf("Starting Agent GRPC server on port %d...", config.AgentListenPort)
zap.S().Infow("starting agent GRPC server", "port", config.AgentListenPort)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.AgentListenPort))
if err != nil {
log.Fatalf("Failed to listen: %v", err)
zap.S().Fatalw("failed to listen on agent port", "error", err)
}

if insecureAgents {
Expand All @@ -315,17 +319,17 @@ func runAgentGRPCServer(insecureAgents bool, serverCert tls.Certificate) {

go func() {
if err := grpcServer.Serve(grpcL); err != nil {
log.Fatalf("Failed to start Agent GRPC server: %v", err)
zap.S().Fatalw("grpcServer.Serve() failed", "error", err)
}
}()

if err := m.Serve(); err != nil {
log.Fatalf("Failed to run m.Serve(): %v", err)
zap.S().Fatalw("Failed to run m.Serve()", "error", err)
}
} else {
certPool, err := authority.MakeCertPool()
if err != nil {
log.Fatalf("While making certpool: %v", err)
zap.S().Fatalw("authority.MakeCertPool", "error", err)
}
creds := credentials.NewTLS(&tls.Config{
ClientCAs: certPool,
Expand All @@ -339,7 +343,7 @@ func runAgentGRPCServer(insecureAgents bool, serverCert tls.Certificate) {
server.endpoints = endpoints
tunnel.RegisterAgentTunnelServiceServer(grpcServer, server)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to start Agent GRPC server: %v", err)
zap.S().Fatalw("grpcServer.Serve() failed", "error", err)
}
}
}

0 comments on commit 181a024

Please sign in to comment.