Files
minilb/app/control/svccont.go

252 lines
6.4 KiB
Go

package control
import (
"context"
"encoding/json"
"time"
kubecore "k8s.io/api/core/v1"
k8inform "k8s.io/client-go/informers"
k8client "k8s.io/client-go/kubernetes"
k8cache "k8s.io/client-go/tools/cache"
kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
kubepatch "k8s.io/apimachinery/pkg/util/strategicpatch"
"helmet/app/logger"
)
type Proxy interface {
CreateOrUpdateForwarder(ctx context.Context, proto string, lport, dport uint32, addrs ...string) error
DeleteForwarder(ctx context.Context, proto string, lport uint32) error
}
type Dummyproxy struct {
log *logger.Logger
}
func NewDummyproxy() Proxy {
return &Dummyproxy{
log: logger.NewLogger("dummyproxy"),
}
}
func (prox *Dummyproxy) CreateOrUpdateForwarder(ctx context.Context, proto string, lport, dport uint32, addrs ...string) error {
return nil
}
func (prox *Dummyproxy) DeleteForwarder(ctx context.Context, proto string, lport uint32) error {
return nil
}
type Controller struct {
lbaddr string
clientset k8client.Interface
informer k8cache.SharedIndexInformer
ctx context.Context
cancel context.CancelFunc
log *logger.Logger
proxy Proxy
}
func NewController(proxy Proxy, clientset k8client.Interface, lbaddr string) *Controller {
ctx, cancel := context.WithCancel(context.Background())
cont := &Controller{
proxy: proxy,
clientset: clientset,
ctx: ctx,
cancel: cancel,
lbaddr: lbaddr,
}
cont.log = logger.NewLogger("controller")
return cont
}
func (cont *Controller) Run() {
cont.log.Debugf("Start controller")
factory := k8inform.NewSharedInformerFactory(cont.clientset, 10*time.Second)
defer factory.Shutdown()
serviceInformer := factory.Core().V1().Services().Informer()
handler := k8cache.ResourceEventHandlerFuncs{
AddFunc: cont.addService,
UpdateFunc: cont.updateService,
DeleteFunc: cont.deleteService,
}
serviceInformer.AddEventHandler(handler)
ctx, cancel := context.WithCancel(cont.ctx)
defer cancel()
factory.Start(ctx.Done())
synced := factory.WaitForCacheSync(ctx.Done())
for _, sync := range synced {
if !sync {
cont.log.Errorf("Cannot sync controller")
return
}
}
cont.informer = serviceInformer
<-ctx.Done()
cont.log.Debugf("Stop controller")
}
func (cont *Controller) Stop() {
cont.cancel()
}
// https://pkg.go.dev/k8s.io/api/core/v1#Service
func (cont *Controller) addService(obj any) {
service := obj.(*kubecore.Service)
service = service.DeepCopy()
cont.log.Debugf("Service %s/%s created", service.Namespace, service.Name)
if service.Spec.Type == kubecore.ServiceTypeLoadBalancer {
_ = makeForwarding(service)
err := cont.patchService(service)
if err != nil {
cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err)
}
}
}
func (cont *Controller) updateService(oldObj, newObj any) {
newService := newObj.(*kubecore.Service)
newService = newService.DeepCopy()
oldService := oldObj.(*kubecore.Service)
oldService = oldService.DeepCopy()
cont.log.Debugf("Service %s/%s updated", newService.Namespace, newService.Name)
if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer {
if cont.serviceDifferent(oldService, newService) {
_ = makeForwarding(newService)
cont.log.Debugf("Updated service %s/%s have new adresses", newService.Namespace, newService.Name)
err := cont.patchService(newService)
if err != nil {
cont.log.Debugf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err)
}
}
}
}
func (cont *Controller) deleteService(obj any) {
service := obj.(*kubecore.Service)
service = service.DeepCopy()
cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name)
}
type Forwarding struct {
Proto string
Port uint32
Destinations []string
}
func makeForwarding(service *kubecore.Service) []*Forwarding {
forwardings := make([]*Forwarding, 0)
for _, port := range service.Spec.Ports {
forwarding := &Forwarding{
Proto: string(port.Protocol),
Port: uint32(port.Port),
Destinations: make([]string, 0),
}
for _, ipaddr := range service.Spec.ClusterIPs {
forwarding.Destinations = append(forwarding.Destinations, ipaddr)
}
}
return forwardings
}
func (cont *Controller) patchService(svc *kubecore.Service) error {
var err error
oldData, err := json.Marshal(svc)
if err != nil {
return err
}
ingressMode := kubecore.LoadBalancerIPModeProxy
ingresses := make([]kubecore.LoadBalancerIngress, 0)
ingresses = append(ingresses, kubecore.LoadBalancerIngress{
IP: cont.lbaddr,
IPMode: &ingressMode,
})
status := kubecore.LoadBalancerStatus{
Ingress: ingresses,
}
svc.Status.LoadBalancer = status
newData, err := json.Marshal(svc)
if err != nil {
return err
}
patchBytes, err := kubepatch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{})
if err != nil {
return err
}
cont.log.Debugf("patch: %s\n", string(patchBytes))
ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second)
patchServiceOpts := kubemeta.PatchOptions{}
strategy := kubetypes.StrategicMergePatchType
_, err = cont.clientset.CoreV1().Services(svc.Namespace).
Patch(ctx, svc.Name, strategy, patchBytes, patchServiceOpts, "status")
if err != nil {
return err
}
return err
}
func (cont *Controller) serviceDifferent(oldService, newService *kubecore.Service) bool {
type Address struct {
Port int32
Proto kubecore.Protocol
Address string
}
oldAddreses := make([]Address, 0)
for _, port := range oldService.Spec.Ports {
for _, ipaddr := range oldService.Spec.ClusterIPs {
address := Address{
Port: port.Port,
Proto: port.Protocol,
Address: ipaddr,
}
oldAddreses = append(oldAddreses, address)
}
}
newAddreses := make([]Address, 0)
for _, port := range newService.Spec.Ports {
for _, ipaddr := range newService.Spec.ClusterIPs {
address := Address{
Port: port.Port,
Proto: port.Protocol,
Address: ipaddr,
}
newAddreses = append(newAddreses, address)
}
}
if len(newAddreses) != len(oldAddreses) {
return true
}
for _, newAddress := range newAddreses {
newFound := false
for _, oldAddress := range oldAddreses {
if oldAddress == newAddress {
newFound = true
}
}
if !newFound {
return true
}
}
for _, oldAddress := range oldAddreses {
oldFound := false
for _, newAddress := range newAddreses {
if newAddress == oldAddress {
oldFound = true
}
}
if !oldFound {
return true
}
}
return false
}