diff --git a/app/config/config.go b/app/config/config.go index 9763cf7..687d235 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -30,7 +30,7 @@ type Auth struct { } type Config struct { - Service Service `json:"service" yaml:"service"` + Service Service `json:"service" yaml:"service"` Auths []Auth `json:"auths" yaml:"auths"` Hostname string `json:"hostname" yaml:"hostname"` LogPath string `json:"logfile" yaml:"logfile"` diff --git a/app/control/svccont.go b/app/control/svccont.go index 8b1cd23..9b9b087 100644 --- a/app/control/svccont.go +++ b/app/control/svccont.go @@ -3,6 +3,7 @@ package control import ( "context" "encoding/json" + "errors" "time" kubecore "k8s.io/api/core/v1" @@ -10,9 +11,9 @@ import ( k8client "k8s.io/client-go/kubernetes" k8cache "k8s.io/client-go/tools/cache" - kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1" - kubetypes "k8s.io/apimachinery/pkg/types" - kubepatch "k8s.io/apimachinery/pkg/util/strategicpatch" + k8meta "k8s.io/apimachinery/pkg/apis/meta/v1" + k8types "k8s.io/apimachinery/pkg/types" + k8patch "k8s.io/apimachinery/pkg/util/strategicpatch" "helmet/app/logger" ) @@ -62,7 +63,20 @@ func NewController(proxy Proxy, clientset k8client.Interface, lbaddr string) *Co return cont } -func (cont *Controller) Run() { +func (cont *Controller) ProbeAPI() bool { + ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second) + opts := k8meta.ListOptions{ + Limit: 1, + } + _, err := cont.clientset.CoreV1().Services("").List(ctx, opts) + if err != nil { + cont.log.Errorf("Error connecting to API: %v", err) + return false + } + return true +} + +func (cont *Controller) Run() error { cont.log.Debugf("Start controller") factory := k8inform.NewSharedInformerFactory(cont.clientset, 10*time.Second) defer factory.Shutdown() @@ -82,12 +96,14 @@ func (cont *Controller) Run() { for _, sync := range synced { if !sync { cont.log.Errorf("Cannot sync controller") - return + err := errors.New("Cannot sync controller") + return err } } cont.informer = serviceInformer <-ctx.Done() cont.log.Debugf("Stop controller") + return nil } func (cont *Controller) Stop() { @@ -99,13 +115,23 @@ func (cont *Controller) Stop() { func (cont *Controller) addService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() - cont.log.Debugf("Service %s/%s created", service.Namespace, service.Name) if service.Spec.Type == kubecore.ServiceTypeLoadBalancer { - _ = makeForwarding(service) + cont.log.Infof("Service %s/%s created", service.Namespace, service.Name) + forws := makeForwardings(service) + cont.log.Debugf("Service %s/%s forwards: %++v", service.Namespace, service.Name, forws) + + for _, forw := range forws { + err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...) + if err != nil { + cont.log.Errorf("Error forwarding service %s/%s: %v", service.Namespace, service.Name, err) + } + } + cont.log.Debugf("Service %s/%s forwarded", service.Namespace, service.Name) err := cont.patchService(service) if err != nil { cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err) } + cont.log.Debugf("Service %s/%s patched", service.Namespace, service.Name) } } @@ -114,15 +140,20 @@ func (cont *Controller) updateService(oldObj, newObj any) { newService = newService.DeepCopy() oldService := oldObj.(*kubecore.Service) oldService = oldService.DeepCopy() - cont.log.Debugf("Service %s/%s updated", newService.Namespace, newService.Name) - if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer { if cont.serviceDifferent(oldService, newService) { - _ = makeForwarding(newService) - cont.log.Debugf("Updated service %s/%s have new adresses", newService.Namespace, newService.Name) + cont.log.Infof("Service %s/%s updated", newService.Namespace, newService.Name) + forws := makeForwardings(newService) + for _, forw := range forws { + err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...) + if err != nil { + cont.log.Errorf("Error forwarding service %s/%s: %v", newService.Namespace, newService.Name, err) + return + } + } err := cont.patchService(newService) if err != nil { - cont.log.Debugf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err) + cont.log.Errorf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err) } } } @@ -131,7 +162,17 @@ func (cont *Controller) updateService(oldObj, newObj any) { func (cont *Controller) deleteService(obj any) { service := obj.(*kubecore.Service) service = service.DeepCopy() - cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name) + if service.Spec.Type == kubecore.ServiceTypeLoadBalancer { + cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name) + forws := makeForwardings(service) + for _, forw := range forws { + err := cont.proxy.DeleteForwarder(cont.ctx, forw.Proto, forw.Port) + if err != nil { + cont.log.Errorf("Error unforwarding service %s/%s: %v", service.Namespace, service.Name, err) + } + } + cont.log.Debugf("Service %s/%s unforwarded", service.Namespace, service.Name) + } } type Forwarding struct { @@ -140,7 +181,7 @@ type Forwarding struct { Destinations []string } -func makeForwarding(service *kubecore.Service) []*Forwarding { +func makeForwardings(service *kubecore.Service) []*Forwarding { forwardings := make([]*Forwarding, 0) for _, port := range service.Spec.Ports { forwarding := &Forwarding{ @@ -151,6 +192,7 @@ func makeForwarding(service *kubecore.Service) []*Forwarding { for _, ipaddr := range service.Spec.ClusterIPs { forwarding.Destinations = append(forwarding.Destinations, ipaddr) } + forwardings = append(forwardings, forwarding) } return forwardings } @@ -177,15 +219,15 @@ func (cont *Controller) patchService(svc *kubecore.Service) error { return err } - patchBytes, err := kubepatch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{}) + patchBytes, err := k8patch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{}) if err != nil { return err } - cont.log.Debugf("patch: %s\n", string(patchBytes)) + //cont.log.Debugf("patch: %s\n", string(patchBytes)) ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second) - patchServiceOpts := kubemeta.PatchOptions{} - strategy := kubetypes.StrategicMergePatchType + patchServiceOpts := k8meta.PatchOptions{} + strategy := k8types.StrategicMergePatchType _, err = cont.clientset.CoreV1().Services(svc.Namespace). Patch(ctx, svc.Name, strategy, patchBytes, patchServiceOpts, "status") if err != nil { diff --git a/app/rproxy/rproxy.go b/app/rproxy/rproxy.go index e6fd6cd..2df6d83 100644 --- a/app/rproxy/rproxy.go +++ b/app/rproxy/rproxy.go @@ -94,6 +94,15 @@ func (prox *Proxy) CreateOrUpdateForwarder(ctx context.Context, proto string, lp func (prox *Proxy) DeleteForwarder(ctx context.Context, proto string, lport uint32) error { var err error + + if lport == 0 { + return errors.New("Zero forwarder lport") + } + if proto == "" { + return errors.New("Empty forwarder type") + } + proto = strings.ToLower(proto) + var forw *Forwarder prox.mtx.Lock() defer prox.mtx.Unlock() diff --git a/app/server/server.go b/app/server/server.go index ba38049..ffb676e 100644 --- a/app/server/server.go +++ b/app/server/server.go @@ -266,21 +266,28 @@ func (srv *Server) Run() error { } sigs := make(chan os.Signal, 1) - done := make(chan error, 1) + done := make(chan error, 3) // Run service - startService := func(svc *service.Service, done chan error) { + startRPCService := func(svc *service.Service, done chan error) { srv.log.Infof("Run rpc service") err = svc.Run() if err != nil { - srv.log.Errorf("Service error: %v", err) + srv.log.Errorf("RPC service error: %v", err) done <- err } } - go startService(srv.svc, done) + go startRPCService(srv.svc, done) // Run controller - srv.log.Infof("Run controller") - go srv.cont.Run() + startContService := func(cont *control.Controller, done chan error) { + srv.log.Infof("Run controller") + err = cont.Run() + if err != nil { + srv.log.Errorf("Controller service error: %v", err) + done <- err + } + } + go startContService(srv.cont, done) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) var signal os.Signal @@ -289,6 +296,13 @@ func (srv *Server) Run() error { srv.log.Infof("Services stopped by signal: %v", signal) srv.cancel() srv.svc.Stop() + srv.cont.Stop() + srv.wg.Wait() + case err := <-done: + srv.log.Infof("Services stopped by error: %v", err) + srv.cancel() + srv.svc.Stop() + srv.cont.Stop() srv.wg.Wait() } return err diff --git a/etc/minilb/kubeconf.yaml b/etc/minilb/kubeconf.yaml new file mode 100644 index 0000000..66a4968 --- /dev/null +++ b/etc/minilb/kubeconf.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCVENDQWUyZ0F3SUJBZ0lJZDFnZytoWUZpVGt3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TmpBME1UQXdOekEyTlRsYUZ3MHpOakEwTURjd056RXhOVGxhTUJVeApFekFSQmdOVkJBTVRDbXQxWW1WeWJtVjBaWE13Z2dFaU1BMEdDU3FHU0liM0RRRUJBUVVBQTRJQkR3QXdnZ0VLCkFvSUJBUURGMndwRVM5czBYWCt2c1NWYkNSUlRyNXZ0eVB6LzdnTlB4dG14aCtzVTJsbU5SdE9TdkdrdTFKWDIKY05hdWRnZCtLVlNUN1IzMXhkMGdLd0ZMV0tDbkw2WkxnQ1B1MGNscDNGeEh3UmxMMkRjcmQzcHZLQ0FXYWJFTwpWaTlHU0dydklYWmFsNlF2MjRRTU1WcUdXSXd0MnhXQkhQY3Ftelk1RFNmN295THo3REE0cjN0OVR2OXg3WDVFCjROeDFtZ3gybm9ob1hXaXU1U0JrRWNvWWRTUXh3K1NzYXc3Vi83N25sY3c4Nm9SbmNnWHVkcUk2S3FYSHdHTEYKNkh0Wm9NeTRqYXFzYzhtY2R3K2o5TjNQSktxbnlURVdGM05jclZ6c0VweW1BNktoY2NoK3RpbDJjL3hIWWxuQQpEbEpaK0FVcXBzNURoMFdlMG5RL092b3RLZTRwQWdNQkFBR2pXVEJYTUE0R0ExVWREd0VCL3dRRUF3SUNwREFQCkJnTlZIUk1CQWY4RUJUQURBUUgvTUIwR0ExVWREZ1FXQkJUVGhIcmlVcHQ3T0U5VHkwVzZBbmFsUTVLaVNUQVYKQmdOVkhSRUVEakFNZ2dwcmRXSmxjbTVsZEdWek1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQ1V2M2dDSVVyTgp5Sk0yWHJ5cWd2NlpMYTMzREplelNjR1BLYWtibjJXVmZxSktuNmJLOWJOREQ2d0tqcEtVTmdLN1REZEU1U3Q5CjZncGR0Nkk3SjBEdTU5bXhmUFFmajlERjBoYWxlY2s4b2xCN1BtaGo1RlMzV01vVllQRU1Td2Y3OC90TmxzMjAKQ3oxSzR2VVJDdFhtTklrcERrd0o5cHZ6TWRvcHNXUHcxTXZqK2wvK2dyMElSZWhpNVhFSXRCSlUvbm9MU3pFegpvTFB3WFlGa1U1dTJDYmFtMEVIcitnb0dlamgzVlBHWVpVYWxYdmxsZlZjT09Gcm54cVp3VHRtWFVOL2VkRlhuCkVZQzNMUVhBZFVybUJqY2E3a3V3emh5T25wbHZqeFNhU2pNMG5YbHpXZ2dKVnBOM2NuS0FyV1Izek4rKzY2UjYKcmZxSHJzcUtTYjBHCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + server: https://192.168.52.213:6443 + name: kubernetes-xxx +contexts: +- context: + cluster: kubernetes-xxx + user: kubernetes-admin + name: kubernetes-admin@kubernetes-xxx +current-context: kubernetes-admin@kubernetes-xxx +kind: Config +users: +- name: kubernetes-admin + user: + client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURLVENDQWhHZ0F3SUJBZ0lJSmJvUXVYSXlXNGN3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TmpBME1UQXdOekEyTlRsYUZ3MHlOekEwTVRBd056RXlNREJhTUR3eApIekFkQmdOVkJBb1RGbXQxWW1WaFpHMDZZMngxYzNSbGNpMWhaRzFwYm5NeEdUQVhCZ05WQkFNVEVHdDFZbVZ5CmJtVjBaWE10WVdSdGFXNHdnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDeDhCNDIKNFU0cSsweElLRWRrbjBmbEdwbTI3dFgyeFdwRElVb2VwUnBydHQ2VWNKTVZNc28zNHpta0RMZ1ZqdnQ1YUl6cgpGQk5wYnhHWWMxeXhHbS96M1QrNHZzVTFLcmZ5bGxyaStzSG9YdUkrVDgwZGtybXlWRFVJUzNhbm44TXlmUWZvCkVha0JUZzBqTkJWWlV4THMyR3BwT0R4eXBGWWpZNUFpU2FkQ1ZzU0RwYzJJRDVlem9UZEF1ZnpHUXBvZGlBdkMKaG9GVkZFaUtaUVBtMDA3VWZsV2Y1UDFKS3hDSDFpT1M5c3d2b2NZdGk0SzRLUDdrc2F6Z21rb3lub0pGRE5HOApzeWdVVk5rblJNTm5ZcFdCZWZJK0hQUVZSaDVWa2Z4Qk1yZ0VjODM1Q05sKzN2elZRdzRyYmFFR0VRYjJFSGIxCnRmdTU5K2lVbUoySXg3VlBBZ01CQUFHalZqQlVNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUsKQmdnckJnRUZCUWNEQWpBTUJnTlZIUk1CQWY4RUFqQUFNQjhHQTFVZEl3UVlNQmFBRk5PRWV1SlNtM3M0VDFQTApSYm9DZHFWRGtxSkpNQTBHQ1NxR1NJYjNEUUVCQ3dVQUE0SUJBUUF0RGdGc2dnUGxSQ2dhREFzUGR0dGtjVUlsCkJHbTdqOGNkRG9TdXZ1V1hsaWpSMnZ5MGJBaGxxbkxCZjVFQldiZmV2cFI2RzFGUEVDRGZFekhKUU9jUi9vb2wKenhNOXIrdkJnb2d2enNpblpWODdEa1ZwRFY1M293cHUwY0VGTWZ0MjArL1lEYnFvRjk5THBGbUVBSDljeGY4YwpOeW81dkRJTXJOQm5qaGtZNkVXbVkrOUQwRzlPV0RTSzFuUFdPUEpRMUIzdjRJUjI1VC8zWnB2RGpxMTc5NFRxCmxOTXpOdTlMWHJOMVdNVTZXcVhtM2ZrS0RVTTRONWVzUXJPNzVQOUZrck01UFlyNGZ6L25zanpVMWVhc3U4aHoKanJtT2NhT3FNajBBYzVwekNMdnF4dnBzOVQ3WEpUTG5pUU5FKzlCTTIySjlkUTkwUW1Ka3BJZDJpaklLCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBc2ZBZU51Rk9LdnRNU0NoSFpKOUg1UnFadHU3VjlzVnFReUZLSHFVYWE3YmVsSENUCkZUTEtOK001cEF5NEZZNzdlV2lNNnhRVGFXOFJtSE5jc1Jwdjg5MC91TDdGTlNxMzhwWmE0dnJCNkY3aVBrL04KSFpLNXNsUTFDRXQycDUvRE1uMEg2QkdwQVU0Tkl6UVZXVk1TN05ocWFUZzhjcVJXSTJPUUlrbW5RbGJFZzZYTgppQStYczZFM1FMbjh4a0thSFlnTHdvYUJWUlJJaW1VRDV0Tk8xSDVWbitUOVNTc1FoOVlqa3ZiTUw2SEdMWXVDCnVDais1TEdzNEpwS01wNkNSUXpSdkxNb0ZGVFpKMFREWjJLVmdYbnlQaHowRlVZZVZaSDhRVEs0QkhQTitRaloKZnQ3ODFVTU9LMjJoQmhFRzloQjI5Ylg3dWZmb2xKaWRpTWUxVHdJREFRQUJBb0lCQUF1enRPajdERDc3WTlyaQorejc4MHFlWWhqOW5tWjZ2QVB3Rk5uQ3Bmd0ZTZDNUZnFteWNlSHdRYjN3QWNpTEkwblQ3a1Rqc2l4SkdBYXc2CitmK1RzNFVnS2M2bWpTV2Y5NVQybW1lUEFpMmIvOWtGT29JVllpeitLTGF3Nzg2NlRRWUcvdndlSFRVK3d0SnIKM0hPUmFRODJkRmtUSDdhU1pDL1RubWtDMndNYndZN3ZPR3ZXeUI0b1VzYlFPdkUySS9oaUYrSFRRYWc5djBtZQpWNllQNkUxQndKQkViZVh6OGx6N29rV3JDQVdpTnE1MlI3a2tiYyt5aFBZa0ZTRTEyZkFXc3RtR1NsTEtUUlRECjZtcitXVDJ2SzNJa2VIUG5mcUtmRmRDY0dDYURYWlFnVGhHSGxOOVJWNkxHWExBcFJLVUpsZTVzUmNPd3pvenIKQVhwbGZCRUNnWUVBeGw4OXFxSEV6MHJqTVM1U2dTZ1VzRWNhdjM5WndoM0pFdjBJMU9HdGJ3c2l0WU4wUDlVMgpVems3UndCUDJzY0h5dGVyWWZsVzhpQ1p5NlJOeEtVRTl2NVBtZWJ0Y0R1blREZEFJTVRDb1NadmtlQXl3SUVsCjNtSXFuUXN6eUJ2d0tzeVZMQ2lpZytod1RtVC84dnVubVhrb0t4MU9UbmNUNzVWOUpmRmdxYnNDZ1lFQTVhRTIKWm00ZTFQSUtsdzBlRlVCRXBzNlZNbDdKT28yeEtYa1JnWTY4dUdXUmN4WlNyT3IvRGtUZE1WQm9IcUVpR2JUYQphakd2VGphMW1UVWIrbnpwRVVkcUFjVlBwWHp1cURKZFRjTS9YSWticHpWcFJuSE1PeExJS2FjMklZY2gzV21qCkx0VGtFTzFuaW00TDAyMzJvQk5udEVPcDFXcHFNZW81MnRIS3IzMENnWUVBbG9SbnB4UHQ0SmZqQVVzem9QcUgKa0NXem53QURYQzM3aHVQbUVwbFdYbjVsakxLOHZ6NkpaT1oxR3UrYXA5Y0RDNTlQVkJ5OEl5WW5iQVEwV2ZRWQphMDBWbXlKRW9UY3lQcnRoZGp1MXJrOGFzdVladkVXQ05WWHBIWllGYVY0QmdvZ08zQjNOQ2llUkozcXRWbklxCmZtSVczM2Z5RmNsMm9BQ2VuKzZodU1jQ2dZQWZxZnhEWVpuMHlTOTJwc05vTU1XR3J1UFozMEltUmRXUkgzV3YKaE13M0xTSkNXV2lyR1hoME1rRENCUkZtYjlldzdjUlVjbWQzQklwRUJ4UEZsYVhaMFJ3b0JGbzRPcUw2SkgvRApNbEl4WTJLaHl2YXFWUlRSYklTWFljeFFFVDgxTmRUZnAxR1BsL3o0RkxhaXYvNExFc0h4RklkOXV6L0xoRDNZCkU2ZThUUUtCZ1FDay9zMlJlR0J5ZWM0TXlaWWcvaWNqQWxpNVhRbHdtK0s5VWYrcVY5cXBjcG54TnJNaFp1ZDIKYnVqVDdKRWpxSEV1MGc5azkrQkF4ZVdtM0dRUS9mVzVsV2N5UHdGdEtkUjFkRm91SmM4ZFhCVFYxY2hOZE1HQwprR09GeFVyNW4vRXlGcGpMTm9IV09WSGpKcm13UGJFUklPUGUxZzlYbW1XaGdkeldqVFFvRWc9PQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=