made minimalLB+CCM
This commit is contained in:
@@ -30,7 +30,7 @@ type Auth struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Service Service `json:"service" yaml:"service"`
|
||||
Service Service `json:"service" yaml:"service"`
|
||||
Auths []Auth `json:"auths" yaml:"auths"`
|
||||
Hostname string `json:"hostname" yaml:"hostname"`
|
||||
LogPath string `json:"logfile" yaml:"logfile"`
|
||||
|
||||
+60
-18
@@ -3,6 +3,7 @@ package control
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
kubecore "k8s.io/api/core/v1"
|
||||
@@ -10,9 +11,9 @@ import (
|
||||
k8client "k8s.io/client-go/kubernetes"
|
||||
k8cache "k8s.io/client-go/tools/cache"
|
||||
|
||||
kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||
kubepatch "k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
k8meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8types "k8s.io/apimachinery/pkg/types"
|
||||
k8patch "k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
|
||||
"helmet/app/logger"
|
||||
)
|
||||
@@ -62,7 +63,20 @@ func NewController(proxy Proxy, clientset k8client.Interface, lbaddr string) *Co
|
||||
return cont
|
||||
}
|
||||
|
||||
func (cont *Controller) Run() {
|
||||
func (cont *Controller) ProbeAPI() bool {
|
||||
ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second)
|
||||
opts := k8meta.ListOptions{
|
||||
Limit: 1,
|
||||
}
|
||||
_, err := cont.clientset.CoreV1().Services("").List(ctx, opts)
|
||||
if err != nil {
|
||||
cont.log.Errorf("Error connecting to API: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (cont *Controller) Run() error {
|
||||
cont.log.Debugf("Start controller")
|
||||
factory := k8inform.NewSharedInformerFactory(cont.clientset, 10*time.Second)
|
||||
defer factory.Shutdown()
|
||||
@@ -82,12 +96,14 @@ func (cont *Controller) Run() {
|
||||
for _, sync := range synced {
|
||||
if !sync {
|
||||
cont.log.Errorf("Cannot sync controller")
|
||||
return
|
||||
err := errors.New("Cannot sync controller")
|
||||
return err
|
||||
}
|
||||
}
|
||||
cont.informer = serviceInformer
|
||||
<-ctx.Done()
|
||||
cont.log.Debugf("Stop controller")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cont *Controller) Stop() {
|
||||
@@ -99,13 +115,23 @@ func (cont *Controller) Stop() {
|
||||
func (cont *Controller) addService(obj any) {
|
||||
service := obj.(*kubecore.Service)
|
||||
service = service.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s created", service.Namespace, service.Name)
|
||||
if service.Spec.Type == kubecore.ServiceTypeLoadBalancer {
|
||||
_ = makeForwarding(service)
|
||||
cont.log.Infof("Service %s/%s created", service.Namespace, service.Name)
|
||||
forws := makeForwardings(service)
|
||||
cont.log.Debugf("Service %s/%s forwards: %++v", service.Namespace, service.Name, forws)
|
||||
|
||||
for _, forw := range forws {
|
||||
err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...)
|
||||
if err != nil {
|
||||
cont.log.Errorf("Error forwarding service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
}
|
||||
}
|
||||
cont.log.Debugf("Service %s/%s forwarded", service.Namespace, service.Name)
|
||||
err := cont.patchService(service)
|
||||
if err != nil {
|
||||
cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
}
|
||||
cont.log.Debugf("Service %s/%s patched", service.Namespace, service.Name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,15 +140,20 @@ func (cont *Controller) updateService(oldObj, newObj any) {
|
||||
newService = newService.DeepCopy()
|
||||
oldService := oldObj.(*kubecore.Service)
|
||||
oldService = oldService.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s updated", newService.Namespace, newService.Name)
|
||||
|
||||
if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer {
|
||||
if cont.serviceDifferent(oldService, newService) {
|
||||
_ = makeForwarding(newService)
|
||||
cont.log.Debugf("Updated service %s/%s have new adresses", newService.Namespace, newService.Name)
|
||||
cont.log.Infof("Service %s/%s updated", newService.Namespace, newService.Name)
|
||||
forws := makeForwardings(newService)
|
||||
for _, forw := range forws {
|
||||
err := cont.proxy.CreateOrUpdateForwarder(cont.ctx, forw.Proto, forw.Port, forw.Port, forw.Destinations...)
|
||||
if err != nil {
|
||||
cont.log.Errorf("Error forwarding service %s/%s: %v", newService.Namespace, newService.Name, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
err := cont.patchService(newService)
|
||||
if err != nil {
|
||||
cont.log.Debugf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err)
|
||||
cont.log.Errorf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -131,7 +162,17 @@ func (cont *Controller) updateService(oldObj, newObj any) {
|
||||
func (cont *Controller) deleteService(obj any) {
|
||||
service := obj.(*kubecore.Service)
|
||||
service = service.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name)
|
||||
if service.Spec.Type == kubecore.ServiceTypeLoadBalancer {
|
||||
cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name)
|
||||
forws := makeForwardings(service)
|
||||
for _, forw := range forws {
|
||||
err := cont.proxy.DeleteForwarder(cont.ctx, forw.Proto, forw.Port)
|
||||
if err != nil {
|
||||
cont.log.Errorf("Error unforwarding service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
}
|
||||
}
|
||||
cont.log.Debugf("Service %s/%s unforwarded", service.Namespace, service.Name)
|
||||
}
|
||||
}
|
||||
|
||||
type Forwarding struct {
|
||||
@@ -140,7 +181,7 @@ type Forwarding struct {
|
||||
Destinations []string
|
||||
}
|
||||
|
||||
func makeForwarding(service *kubecore.Service) []*Forwarding {
|
||||
func makeForwardings(service *kubecore.Service) []*Forwarding {
|
||||
forwardings := make([]*Forwarding, 0)
|
||||
for _, port := range service.Spec.Ports {
|
||||
forwarding := &Forwarding{
|
||||
@@ -151,6 +192,7 @@ func makeForwarding(service *kubecore.Service) []*Forwarding {
|
||||
for _, ipaddr := range service.Spec.ClusterIPs {
|
||||
forwarding.Destinations = append(forwarding.Destinations, ipaddr)
|
||||
}
|
||||
forwardings = append(forwardings, forwarding)
|
||||
}
|
||||
return forwardings
|
||||
}
|
||||
@@ -177,15 +219,15 @@ func (cont *Controller) patchService(svc *kubecore.Service) error {
|
||||
return err
|
||||
}
|
||||
|
||||
patchBytes, err := kubepatch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{})
|
||||
patchBytes, err := k8patch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cont.log.Debugf("patch: %s\n", string(patchBytes))
|
||||
//cont.log.Debugf("patch: %s\n", string(patchBytes))
|
||||
|
||||
ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second)
|
||||
patchServiceOpts := kubemeta.PatchOptions{}
|
||||
strategy := kubetypes.StrategicMergePatchType
|
||||
patchServiceOpts := k8meta.PatchOptions{}
|
||||
strategy := k8types.StrategicMergePatchType
|
||||
_, err = cont.clientset.CoreV1().Services(svc.Namespace).
|
||||
Patch(ctx, svc.Name, strategy, patchBytes, patchServiceOpts, "status")
|
||||
if err != nil {
|
||||
|
||||
@@ -94,6 +94,15 @@ func (prox *Proxy) CreateOrUpdateForwarder(ctx context.Context, proto string, lp
|
||||
|
||||
func (prox *Proxy) DeleteForwarder(ctx context.Context, proto string, lport uint32) error {
|
||||
var err error
|
||||
|
||||
if lport == 0 {
|
||||
return errors.New("Zero forwarder lport")
|
||||
}
|
||||
if proto == "" {
|
||||
return errors.New("Empty forwarder type")
|
||||
}
|
||||
proto = strings.ToLower(proto)
|
||||
|
||||
var forw *Forwarder
|
||||
prox.mtx.Lock()
|
||||
defer prox.mtx.Unlock()
|
||||
|
||||
+20
-6
@@ -266,21 +266,28 @@ func (srv *Server) Run() error {
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan error, 1)
|
||||
done := make(chan error, 3)
|
||||
|
||||
// Run service
|
||||
startService := func(svc *service.Service, done chan error) {
|
||||
startRPCService := func(svc *service.Service, done chan error) {
|
||||
srv.log.Infof("Run rpc service")
|
||||
err = svc.Run()
|
||||
if err != nil {
|
||||
srv.log.Errorf("Service error: %v", err)
|
||||
srv.log.Errorf("RPC service error: %v", err)
|
||||
done <- err
|
||||
}
|
||||
}
|
||||
go startService(srv.svc, done)
|
||||
go startRPCService(srv.svc, done)
|
||||
// Run controller
|
||||
srv.log.Infof("Run controller")
|
||||
go srv.cont.Run()
|
||||
startContService := func(cont *control.Controller, done chan error) {
|
||||
srv.log.Infof("Run controller")
|
||||
err = cont.Run()
|
||||
if err != nil {
|
||||
srv.log.Errorf("Controller service error: %v", err)
|
||||
done <- err
|
||||
}
|
||||
}
|
||||
go startContService(srv.cont, done)
|
||||
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
var signal os.Signal
|
||||
@@ -289,6 +296,13 @@ func (srv *Server) Run() error {
|
||||
srv.log.Infof("Services stopped by signal: %v", signal)
|
||||
srv.cancel()
|
||||
srv.svc.Stop()
|
||||
srv.cont.Stop()
|
||||
srv.wg.Wait()
|
||||
case err := <-done:
|
||||
srv.log.Infof("Services stopped by error: %v", err)
|
||||
srv.cancel()
|
||||
srv.svc.Stop()
|
||||
srv.cont.Stop()
|
||||
srv.wg.Wait()
|
||||
}
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user