added unlinked controller; change create forwarded to createOrUpdate; added global proxy mutex
This commit is contained in:
@@ -0,0 +1,25 @@
|
||||
package control
|
||||
|
||||
import (
|
||||
kubeclient "k8s.io/client-go/kubernetes"
|
||||
kubeclicmd "k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
func makeClientset(kubeconf []byte) (kubeclient.Interface, error) {
|
||||
var res kubeclient.Interface
|
||||
var err error
|
||||
clientConfig, err := kubeclicmd.NewClientConfigFromBytes(kubeconf)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
restConfig, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
kubeClient, err := kubeclient.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
res = kubeClient
|
||||
return res, err
|
||||
}
|
||||
@@ -0,0 +1,226 @@
|
||||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
kubecore "k8s.io/api/core/v1"
|
||||
k8inform "k8s.io/client-go/informers"
|
||||
k8client "k8s.io/client-go/kubernetes"
|
||||
k8cache "k8s.io/client-go/tools/cache"
|
||||
|
||||
kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||
kubepatch "k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
|
||||
"helmet/app/logger"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
lbaddr string
|
||||
clientset k8client.Interface
|
||||
informer k8cache.SharedIndexInformer
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func NewController(clientset k8client.Interface, lbaddr string) *Controller {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cont := &Controller{
|
||||
clientset: clientset,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
lbaddr: lbaddr,
|
||||
}
|
||||
cont.log = logger.NewLogger("controller")
|
||||
return cont
|
||||
}
|
||||
|
||||
func (cont *Controller) Run() {
|
||||
cont.log.Debugf("Start controller")
|
||||
factory := k8inform.NewSharedInformerFactory(cont.clientset, time.Minute*10)
|
||||
defer factory.Shutdown()
|
||||
|
||||
serviceInformer := factory.Core().V1().Services().Informer()
|
||||
handler := k8cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: cont.addService,
|
||||
UpdateFunc: cont.updateService,
|
||||
DeleteFunc: cont.deleteService,
|
||||
}
|
||||
serviceInformer.AddEventHandler(handler)
|
||||
|
||||
ctx, cancel := context.WithCancel(cont.ctx)
|
||||
defer cancel()
|
||||
factory.Start(ctx.Done())
|
||||
synced := factory.WaitForCacheSync(ctx.Done())
|
||||
for _, sync := range synced {
|
||||
if !sync {
|
||||
return
|
||||
}
|
||||
}
|
||||
cont.informer = serviceInformer
|
||||
<-ctx.Done()
|
||||
cont.log.Debugf("Stop controller")
|
||||
}
|
||||
|
||||
func (cont *Controller) Stop() {
|
||||
cont.cancel()
|
||||
}
|
||||
|
||||
// https://pkg.go.dev/k8s.io/api/core/v1#Service
|
||||
|
||||
func (cont *Controller) addService(obj any) {
|
||||
service := obj.(*kubecore.Service)
|
||||
service = service.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s created", service.Namespace, service.Name)
|
||||
if service.Spec.Type == kubecore.ServiceTypeLoadBalancer {
|
||||
_ = makeForwarding(service)
|
||||
err := cont.patchService(service)
|
||||
if err != nil {
|
||||
cont.log.Debugf("Error patch service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cont *Controller) updateService(oldObj, newObj any) {
|
||||
newService := newObj.(*kubecore.Service)
|
||||
newService = newService.DeepCopy()
|
||||
oldService := oldObj.(*kubecore.Service)
|
||||
oldService = oldService.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s updated", newService.Namespace, newService.Name)
|
||||
|
||||
if newService.Spec.Type == kubecore.ServiceTypeLoadBalancer {
|
||||
if cont.serviceDifferent(oldService, newService) {
|
||||
_ = makeForwarding(newService)
|
||||
cont.log.Debugf("Updated service %s/%s have new adresses", newService.Namespace, newService.Name)
|
||||
err := cont.patchService(newService)
|
||||
if err != nil {
|
||||
cont.log.Debugf("Error patch service %s/%s: %v", newService.Namespace, newService.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cont *Controller) deleteService(obj any) {
|
||||
service := obj.(*kubecore.Service)
|
||||
service = service.DeepCopy()
|
||||
cont.log.Debugf("Service %s/%s deleted", service.Namespace, service.Name)
|
||||
}
|
||||
|
||||
type Forwarding struct {
|
||||
Proto string
|
||||
Port uint32
|
||||
Destinations []string
|
||||
}
|
||||
|
||||
func makeForwarding(service *kubecore.Service) []*Forwarding {
|
||||
forwardings := make([]*Forwarding, 0)
|
||||
for _, port := range service.Spec.Ports {
|
||||
forwarding := &Forwarding{
|
||||
Proto: string(port.Protocol),
|
||||
Port: uint32(port.Port),
|
||||
Destinations: make([]string, 0),
|
||||
}
|
||||
for _, ipaddr := range service.Spec.ClusterIPs {
|
||||
forwarding.Destinations = append(forwarding.Destinations, ipaddr)
|
||||
}
|
||||
}
|
||||
return forwardings
|
||||
}
|
||||
|
||||
func (cont *Controller) patchService(svc *kubecore.Service) error {
|
||||
var err error
|
||||
oldData, err := json.Marshal(svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ingressMode := kubecore.LoadBalancerIPModeProxy
|
||||
ingresses := make([]kubecore.LoadBalancerIngress, 0)
|
||||
ingresses = append(ingresses, kubecore.LoadBalancerIngress{
|
||||
IP: cont.lbaddr,
|
||||
IPMode: &ingressMode,
|
||||
})
|
||||
status := kubecore.LoadBalancerStatus{
|
||||
Ingress: ingresses,
|
||||
}
|
||||
svc.Status.LoadBalancer = status
|
||||
|
||||
newData, err := json.Marshal(svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
patchBytes, err := kubepatch.CreateTwoWayMergePatch(oldData, newData, kubecore.Service{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cont.log.Debugf("patch: %s\n", string(patchBytes))
|
||||
|
||||
ctx, _ := context.WithTimeout(cont.ctx, 5*time.Second)
|
||||
patchServiceOpts := kubemeta.PatchOptions{}
|
||||
strategy := kubetypes.StrategicMergePatchType
|
||||
_, err = cont.clientset.CoreV1().Services(svc.Namespace).
|
||||
Patch(ctx, svc.Name, strategy, patchBytes, patchServiceOpts, "status")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (cont *Controller) serviceDifferent(oldService, newService *kubecore.Service) bool {
|
||||
type Address struct {
|
||||
Port int32
|
||||
Proto kubecore.Protocol
|
||||
Address string
|
||||
}
|
||||
oldAddreses := make([]Address, 0)
|
||||
for _, port := range oldService.Spec.Ports {
|
||||
for _, ipaddr := range oldService.Spec.ClusterIPs {
|
||||
address := Address{
|
||||
Port: port.Port,
|
||||
Proto: port.Protocol,
|
||||
Address: ipaddr,
|
||||
}
|
||||
oldAddreses = append(oldAddreses, address)
|
||||
}
|
||||
}
|
||||
newAddreses := make([]Address, 0)
|
||||
for _, port := range newService.Spec.Ports {
|
||||
for _, ipaddr := range newService.Spec.ClusterIPs {
|
||||
address := Address{
|
||||
Port: port.Port,
|
||||
Proto: port.Protocol,
|
||||
Address: ipaddr,
|
||||
}
|
||||
newAddreses = append(newAddreses, address)
|
||||
}
|
||||
}
|
||||
if len(newAddreses) != len(oldAddreses) {
|
||||
return true
|
||||
}
|
||||
for _, newAddress := range newAddreses {
|
||||
newFound := false
|
||||
for _, oldAddress := range oldAddreses {
|
||||
if oldAddress == newAddress {
|
||||
newFound = true
|
||||
}
|
||||
}
|
||||
if !newFound {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, oldAddress := range oldAddreses {
|
||||
oldFound := false
|
||||
for _, newAddress := range newAddreses {
|
||||
if newAddress == oldAddress {
|
||||
oldFound = true
|
||||
}
|
||||
}
|
||||
if !oldFound {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user