package control import ( "context" "encoding/json" "time" kubecore "k8s.io/api/core/v1" k8inform "k8s.io/client-go/informers" k8client "k8s.io/client-go/kubernetes" k8cache "k8s.io/client-go/tools/cache" kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" kubepatch "k8s.io/apimachinery/pkg/util/strategicpatch" "helmet/app/logger" ) type Controller struct { lbaddr string clientset k8client.Interface informer k8cache.SharedIndexInformer ctx context.Context cancel context.CancelFunc log *logger.Logger } func NewController(clientset k8client.Interface, lbaddr string) *Controller { ctx, cancel := context.WithCancel(context.Background()) cont := &Controller{ clientset: clientset, ctx: ctx, cancel: cancel, lbaddr: lbaddr, } cont.log = logger.NewLogger("controller") return cont } func (cont *Controller) Run() { cont.log.Debugf("Start controller") factory := k8inform.NewSharedInformerFactory(cont.clientset, time.Minute*10) defer factory.Shutdown() serviceInformer := factory.Core().V1().Services().Informer() handler := k8cache.ResourceEventHandlerFuncs{ AddFunc: cont.addService, UpdateFunc: cont.updateService, DeleteFunc: cont.deleteService, } serviceInformer.AddEventHandler(handler) ctx, cancel := context.WithCancel(cont.ctx) defer cancel() factory.Start(ctx.Done()) synced := factory.WaitForCacheSync(ctx.Done()) for _, sync := range synced { if !sync { return } } cont.informer = serviceInformer <-ctx.Done() cont.log.Debugf("Stop controller") } func (cont *Controller) Stop() { cont.cancel() } // https://pkg.go.dev/k8s.io/api/core/v1#Service func (cont *Controller) addService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() cont.log.Debugf("Service %s/%s created", service.Namespace, service.Name) if service.Spec.Type == kubecore.ServiceTypeLoadBalancer { _ = makeForwarding(service) err := cont.patchService(service) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err) } } } func (cont *Controller) updateService(oldObj, newObj any) { newService := newObj.(*kubecore.Service) newService = newService.DeepCopy() oldService := oldObj.(*kubecore.Service) oldService = oldService.DeepCopy() cont.log.Debugf("Service %s/%s updated", newService.Namespace, newService.Name) if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer { if cont.serviceDifferent(oldService, newService) { _ = makeForwarding(newService) cont.log.Debugf("Updated service %s/%s have new adresses", newService.Namespace, newService.Name) err := cont.patchService(newService) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err) } } } } func (cont *Controller) deleteService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name) } type Forwarding struct { Proto string Port uint32 Destinations []string } func makeForwarding(service *kubecore.Service) []*Forwarding { forwardings := make([]*Forwarding, 0) for _, port := range service.Spec.Ports { forwarding := &Forwarding{ Proto: string(port.Protocol), Port: uint32(port.Port), Destinations: make([]string, 0), } for _, ipaddr := range service.Spec.ClusterIPs { forwarding.Destinations = append(forwarding.Destinations, ipaddr) } } return forwardings } func (cont *Controller) patchService(svc *kubecore.Service) error { var err error oldData, err := json.Marshal(svc) if err != nil { return err } ingressMode := kubecore.LoadBalancerIPModeProxy ingresses := make([]kubecore.LoadBalancerIngress, 0) ingresses = append(ingresses, kubecore.LoadBalancerIngress{ IP: cont.lbaddr, IPMode: &ingressMode, }) status := kubecore.LoadBalancerStatus{ Ingress: ingresses, } svc.Status.LoadBalancer = status newData, err := json.Marshal(svc) if err != nil { return err } patchBytes, err := kubepatch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{}) if err != nil { return err } cont.log.Debugf("patch: %s\n", string(patchBytes)) ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second) patchServiceOpts := kubemeta.PatchOptions{} strategy := kubetypes.StrategicMergePatchType _, err = cont.clientset.CoreV1().Services(svc.Namespace). Patch(ctx, svc.Name, strategy, patchBytes, patchServiceOpts, "status") if err != nil { return err } return err } func (cont *Controller) serviceDifferent(oldService, newService *kubecore.Service) bool { type Address struct { Port int32 Proto kubecore.Protocol Address string } oldAddreses := make([]Address, 0) for _, port := range oldService.Spec.Ports { for _, ipaddr := range oldService.Spec.ClusterIPs { address := Address{ Port: port.Port, Proto: port.Protocol, Address: ipaddr, } oldAddreses = append(oldAddreses, address) } } newAddreses := make([]Address, 0) for _, port := range newService.Spec.Ports { for _, ipaddr := range newService.Spec.ClusterIPs { address := Address{ Port: port.Port, Proto: port.Protocol, Address: ipaddr, } newAddreses = append(newAddreses, address) } } if len(newAddreses) != len(oldAddreses) { return true } for _, newAddress := range newAddreses { newFound := false for _, oldAddress := range oldAddreses { if oldAddress == newAddress { newFound = true } } if !newFound { return true } } for _, oldAddress := range oldAddreses { oldFound := false for _, newAddress := range newAddreses { if newAddress == oldAddress { oldFound = true } } if !oldFound { return true } } return false }