package control import ( "context" "encoding/json" "errors" "time" kubecore "k8s.io/api/core/v1" k8inform "k8s.io/client-go/informers" k8client "k8s.io/client-go/kubernetes" k8cache "k8s.io/client-go/tools/cache" k8meta "k8s.io/apimachinery/pkg/apis/meta/v1" k8types "k8s.io/apimachinery/pkg/types" k8patch "k8s.io/apimachinery/pkg/util/strategicpatch" "helmet/app/logger" ) type Proxy interface { CreateOrUpdateForwarder(ctx context.Context, proto string, lport, dport uint32, addrs ...string) error DeleteForwarder(ctx context.Context, proto string, lport uint32) error } type Dummyproxy struct { log *logger.Logger } func NewDummyproxy() Proxy { return &Dummyproxy{ log: logger.NewLogger("dummyproxy"), } } func (prox *Dummyproxy) CreateOrUpdateForwarder(ctx context.Context, proto string, lport, dport uint32, addrs ...string) error { return nil } func (prox *Dummyproxy) DeleteForwarder(ctx context.Context, proto string, lport uint32) error { return nil } type Controller struct { lbaddr string clientset k8client.Interface informer k8cache.SharedIndexInformer ctx context.Context cancel context.CancelFunc log *logger.Logger proxy Proxy } func NewController(proxy Proxy, clientset k8client.Interface, lbaddr string) *Controller { ctx, cancel := context.WithCancel(context.Background()) cont := &Controller{ proxy: proxy, clientset: clientset, ctx: ctx, cancel: cancel, lbaddr: lbaddr, } cont.log = logger.NewLogger("controller") return cont } func (cont *Controller) ProbeAPI() bool { ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second) opts := k8meta.ListOptions{ Limit: 1, } _, err := cont.clientset.CoreV1().Services("").List(ctx, opts) if err != nil { cont.log.Errorf("Error connecting to API: %v", err) return false } return true } func (cont *Controller) Run() error { cont.log.Debugf("Start controller") factory := k8inform.NewSharedInformerFactory(cont.clientset, 10*time.Second) defer factory.Shutdown() serviceInformer := factory.Core().V1().Services().Informer() handler := k8cache.ResourceEventHandlerFuncs{ AddFunc: cont.addService, UpdateFunc: cont.updateService, DeleteFunc: cont.deleteService, } serviceInformer.AddEventHandler(handler) ctx, cancel := context.WithCancel(cont.ctx) defer cancel() factory.Start(ctx.Done()) synced := factory.WaitForCacheSync(ctx.Done()) for _, sync := range synced { if !sync { cont.log.Errorf("Cannot sync controller") err := errors.New("Cannot sync controller") return err } } cont.informer = serviceInformer <-ctx.Done() cont.log.Debugf("Stop controller") return nil } func (cont *Controller) Stop() { cont.cancel() } // https://pkg.go.dev/k8s.io/api/core/v1#Service func (cont *Controller) addService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() if service.Spec.Type == kubecore.ServiceTypeLoadBalancer { cont.log.Infof("Service %s/%s created", service.Namespace, service.Name) forws := makeForwardings(service) cont.log.Debugf("Service %s/%s forwards: %++v", service.Namespace, service.Name, forws) for _, forw := range forws { err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...) if err != nil { cont.log.Errorf("Error forwarding service %s/%s: %v", service.Namespace, service.Name, err) } } cont.log.Debugf("Service %s/%s forwarded", service.Namespace, service.Name) err := cont.addStatus(service) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err) } err = cont.addFinalizer(service) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err) } cont.log.Debugf("Service %s/%s patched", service.Namespace, service.Name) } } func (cont *Controller) updateService(oldObj, newObj any) { newService := newObj.(*kubecore.Service) newService = newService.DeepCopy() oldService := oldObj.(*kubecore.Service) oldService = oldService.DeepCopy() if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer { if cont.serviceDifferent(oldService, newService) { cont.log.Infof("Service %s/%s updated", newService.Namespace, newService.Name) forws := makeForwardings(newService) for _, forw := range forws { err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...) if err != nil { cont.log.Errorf("Error forwarding service %s/%s: %v", newService.Namespace, newService.Name, err) return } } err := cont.addStatus(newService) if err != nil { cont.log.Errorf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err) } } } } func (cont *Controller) deleteService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() if service.Spec.Type == kubecore.ServiceTypeLoadBalancer { cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name) forws := makeForwardings(service) for _, forw := range forws { err := cont.proxy.DeleteForwarder(cont.ctx, forw.Proto, forw.Port) if err != nil { cont.log.Errorf("Error unforwarding service %s/%s: %v", service.Namespace, service.Name, err) } } err := cont.deleteFinalizer(service) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err) } cont.log.Debugf("Service %s/%s unforwarded", service.Namespace, service.Name) } } type Forwarding struct { Proto string Port uint32 Destinations []string } func makeForwardings(service *kubecore.Service) []*Forwarding { forwardings := make([]*Forwarding, 0) for _, port := range service.Spec.Ports { forwarding := &Forwarding{ Proto: string(port.Protocol), Port: uint32(port.Port), Destinations: make([]string, 0), } for _, ipaddr := range service.Spec.ClusterIPs { forwarding.Destinations = append(forwarding.Destinations, ipaddr) } forwardings = append(forwardings, forwarding) } return forwardings } const LoadBalancerFinalizer = "service.kubernetes.io/load-balancer" func (cont *Controller) addFinalizer(oldSvc *kubecore.Service) error { var err error for _, finalizer := range oldSvc.ObjectMeta.Finalizers { if finalizer == LoadBalancerFinalizer { return err } } newSvc := oldSvc.DeepCopy() newSvc.ObjectMeta.Finalizers = append(newSvc.ObjectMeta.Finalizers, LoadBalancerFinalizer) err = cont.patchService(oldSvc, newSvc) if err != nil { return err } return err } func (cont *Controller) deleteFinalizer(oldSvc *kubecore.Service) error { var err error haveFinalizer := false newFinalizers := make([]string, 0) for _, finalizer := range oldSvc.ObjectMeta.Finalizers { if finalizer == LoadBalancerFinalizer { haveFinalizer = true } else { newFinalizers = append(newFinalizers, finalizer) } } if !haveFinalizer { return err } newSvc := oldSvc.DeepCopy() newSvc.ObjectMeta.Finalizers = newFinalizers err = cont.patchService(oldSvc, newSvc) if err != nil { return err } return err } func (cont *Controller) addStatus(oldSvc *kubecore.Service) error { var err error newSvc := oldSvc.DeepCopy() ingressMode := kubecore.LoadBalancerIPModeProxy ingresses := make([]kubecore.LoadBalancerIngress, 0) ingresses = append(ingresses, kubecore.LoadBalancerIngress{ IP: cont.lbaddr, IPMode: &ingressMode, }) status := kubecore.LoadBalancerStatus{ Ingress: ingresses, } newSvc.Status.LoadBalancer = status err = cont.patchService(oldSvc, newSvc) if err != nil { return err } return err } func (cont *Controller) patchService(oldSvc, newSvc *kubecore.Service) error { var err error oldData, err := json.Marshal(oldSvc) if err != nil { return err } newData, err := json.Marshal(newSvc) if err != nil { return err } patchBytes, err := k8patch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{}) if err != nil { return err } //cont.log.Debugf("patch: %s\n", string(patchBytes)) ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second) patchServiceOpts := k8meta.PatchOptions{} strategy := k8types.StrategicMergePatchType _, err = cont.clientset.CoreV1().Services(oldSvc.Namespace). Patch(ctx, oldSvc.Name, strategy, patchBytes, patchServiceOpts, "status") if err != nil { return err } return err } func (cont *Controller) serviceDifferent(oldService, newService *kubecore.Service) bool { type Address struct { Port int32 Proto kubecore.Protocol Address string } oldAddreses := make([]Address, 0) for _, port := range oldService.Spec.Ports { for _, ipaddr := range oldService.Spec.ClusterIPs { address := Address{ Port: port.Port, Proto: port.Protocol, Address: ipaddr, } oldAddreses = append(oldAddreses, address) } } newAddreses := make([]Address, 0) for _, port := range newService.Spec.Ports { for _, ipaddr := range newService.Spec.ClusterIPs { address := Address{ Port: port.Port, Proto: port.Protocol, Address: ipaddr, } newAddreses = append(newAddreses, address) } } if len(newAddreses) != len(oldAddreses) { return true } for _, newAddress := range newAddreses { newFound := false for _, oldAddress := range oldAddreses { if oldAddress == newAddress { newFound = true } } if !newFound { return true } } for _, oldAddress := range oldAddreses { oldFound := false for _, newAddress := range newAddreses { if newAddress == oldAddress { oldFound = true } } if !oldFound { return true } } return false }