Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix: add traffic-manager pod ip to route table #396

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 48 additions & 40 deletions pkg/handler/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ConnectOptions struct {
apiServerIPs []net.IP
extraHost []dns.Entry
once sync.Once
tunName string
}

func (c *ConnectOptions) Context() context.Context {
Expand Down Expand Up @@ -214,11 +215,12 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
return err
}
log.Info("Forwarding port...")
if err = c.portForward(c.ctx, []string{
portPair := []string{
fmt.Sprintf("%d:10800", rawTCPForwardPort),
fmt.Sprintf("%d:10801", gvisorTCPForwardPort),
fmt.Sprintf("%d:10802", gvisorUDPForwardPort),
}); err != nil {
}
if err = c.portForward(c.ctx, portPair); err != nil {
return
}
if util.IsWindows() {
Expand Down Expand Up @@ -272,10 +274,13 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
}
return
}
pod := podList[0]
// add route in case of don't have permission to watch pod, but pod recreated ip changed, so maybe this ip can not visit
_ = c.addRoute(pod.Status.PodIP)
childCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var readyChan = make(chan struct{})
podName := podList[0].GetName()
podName := pod.GetName()
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0])
Expand Down Expand Up @@ -427,58 +432,29 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress

// Listen all pod, add route if needed
func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
tunName, e := c.GetTunDeviceName()
if e != nil {
return e
var err error
c.tunName, err = c.GetTunDeviceName()
if err != nil {
return err
}

podNs, svcNs, err1 := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.Namespace})
if err1 != nil {
return err1
}

var addRouteFunc = func(resource, ipStr string) {
ip := net.ParseIP(ipStr)
if ip == nil {
return
}
for _, p := range c.apiServerIPs {
// if pod ip or service ip is equal to apiServer ip, can not add it to route table
if p.Equal(ip) {
return
}
}

var mask net.IPMask
if ip.To4() != nil {
mask = net.CIDRMask(32, 32)
} else {
mask = net.CIDRMask(128, 128)
}
if r, err := netroute.New(); err == nil {
iface, _, _, err := r.Route(ip)
if err == nil && iface.Name == tunName {
return
}
}
errs := tun.AddRoutes(tunName, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}})
if errs != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", resource, ip, errs)
}
}

go func() {
var listDone bool
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), addRouteFunc)
err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
if err != nil {
return err
}
listDone = true
}
err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), addRouteFunc)
err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
Expand All @@ -494,13 +470,13 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), addRouteFunc)
err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
if err != nil {
return err
}
listDone = true
}
err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), addRouteFunc)
err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
Expand All @@ -514,6 +490,38 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
return nil
}

func (c *ConnectOptions) addRoute(ipStr string) error {
if c.tunName == "" {
return nil
}

ip := net.ParseIP(ipStr)
if ip == nil {
return nil
}
for _, p := range c.apiServerIPs {
// if pod ip or service ip is equal to apiServer ip, can not add it to route table
if p.Equal(ip) {
return nil
}
}

var mask net.IPMask
if ip.To4() != nil {
mask = net.CIDRMask(32, 32)
} else {
mask = net.CIDRMask(128, 128)
}
if r, err := netroute.New(); err == nil {
ifi, _, _, err := r.Route(ip)
if err == nil && ifi.Name == c.tunName {
return nil
}
}
err := tun.AddRoutes(c.tunName, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}})
return err
}

func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
if !util.IsWindows() {
return
Expand Down
22 changes: 14 additions & 8 deletions pkg/util/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset,
return
}

func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(resource string, ipStr string)) error {
func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(ipStr string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
serviceList, err := lister.List(ctx, opts)
if err != nil {
return err
}
for _, service := range serviceList.Items {
addRouteFunc(service.Name, service.Spec.ClusterIP)
err = addRouteFunc(service.Spec.ClusterIP)
if err != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", service.Name, service.Spec.ClusterIP, err)
}
}
if serviceList.Continue == "" {
return nil
Expand All @@ -67,7 +70,7 @@ func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc
}
}

func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(resource string, ipStr string)) error {
func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(ipStr string) error) error {
defer func() {
if er := recover(); er != nil {
log.Error(er)
Expand All @@ -91,12 +94,12 @@ func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, r
if !ok {
continue
}
routeFunc(svc.Name, svc.Spec.ClusterIP)
_ = routeFunc(svc.Spec.ClusterIP)
}
}
}

func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(ipStr string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
podList, err := lister.List(ctx, opts)
Expand All @@ -107,7 +110,10 @@ func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(res
if pod.Spec.HostNetwork {
continue
}
addRouteFunc(pod.Name, pod.Status.PodIP)
err = addRouteFunc(pod.Status.PodIP)
if err != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", pod.Name, pod.Status.PodIP, err)
}
}
if podList.Continue == "" {
return nil
Expand All @@ -116,7 +122,7 @@ func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(res
}
}

func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(ipStr string) error) error {
defer func() {
if er := recover(); er != nil {
log.Errorln(er)
Expand Down Expand Up @@ -144,7 +150,7 @@ func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteF
continue
}
ip := pod.Status.PodIP
addRouteFunc(pod.Name, ip)
_ = addRouteFunc(ip)
}
}
}
Loading