2019-03-14 14:56:06 +00:00
package ingress
2019-02-21 22:08:05 +00:00
import (
"context"
2021-01-05 11:26:04 +00:00
"crypto/sha256"
2019-04-01 13:30:07 +00:00
"errors"
2019-02-21 22:08:05 +00:00
"fmt"
"math"
2020-12-04 19:56:04 +00:00
"net"
2019-02-21 22:08:05 +00:00
"os"
"sort"
2020-12-04 19:56:04 +00:00
"strconv"
2019-02-21 22:08:05 +00:00
"strings"
"time"
2020-02-26 09:36:05 +00:00
"github.com/cenkalti/backoff/v4"
2019-10-25 13:46:05 +00:00
"github.com/mitchellh/hashstructure"
2020-08-17 16:04:03 +00:00
ptypes "github.com/traefik/paerser/types"
2020-09-16 13:46:04 +00:00
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/job"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/provider"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/tls"
2019-02-21 22:08:05 +00:00
corev1 "k8s.io/api/core/v1"
2021-03-15 10:16:04 +00:00
networkingv1 "k8s.io/api/networking/v1"
2019-02-21 22:08:05 +00:00
"k8s.io/apimachinery/pkg/labels"
)
const (
2020-07-15 17:18:03 +00:00
annotationKubernetesIngressClass = "kubernetes.io/ingress.class"
traefikDefaultIngressClass = "traefik"
traefikDefaultIngressClassController = "traefik.io/ingress-controller"
defaultPathMatcher = "PathPrefix"
2019-02-21 22:08:05 +00:00
)
// Provider holds configurations of the provider.
type Provider struct {
2021-05-06 16:12:10 +00:00
Endpoint string ` description:"Kubernetes server endpoint (required for external cluster client)." json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty" `
Token string ` description:"Kubernetes bearer token (not needed for in-cluster client)." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" `
CertAuthFilePath string ` description:"Kubernetes certificate authority file path (not needed for in-cluster client)." json:"certAuthFilePath,omitempty" toml:"certAuthFilePath,omitempty" yaml:"certAuthFilePath,omitempty" `
Namespaces [ ] string ` description:"Kubernetes namespaces." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" export:"true" `
LabelSelector string ` description:"Kubernetes Ingress label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true" `
IngressClass string ` description:"Value of kubernetes.io/ingress.class annotation or IngressClass name to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true" `
IngressEndpoint * EndpointIngress ` description:"Kubernetes Ingress Endpoint." json:"ingressEndpoint,omitempty" toml:"ingressEndpoint,omitempty" yaml:"ingressEndpoint,omitempty" export:"true" `
ThrottleDuration ptypes . Duration ` description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty" export:"true" `
AllowEmptyServices bool ` description:"Allow creation of services without endpoints." json:"allowEmptyServices,omitempty" toml:"allowEmptyServices,omitempty" yaml:"allowEmptyServices,omitempty" export:"true" `
lastConfiguration safe . Safe
2019-02-21 22:08:05 +00:00
}
2020-05-11 10:06:07 +00:00
// EndpointIngress holds the endpoint information for the Kubernetes provider.
2019-03-18 09:10:04 +00:00
type EndpointIngress struct {
2019-07-01 09:30:05 +00:00
IP string ` description:"IP used for Kubernetes Ingress endpoints." json:"ip,omitempty" toml:"ip,omitempty" yaml:"ip,omitempty" `
Hostname string ` description:"Hostname used for Kubernetes Ingress endpoints." json:"hostname,omitempty" toml:"hostname,omitempty" yaml:"hostname,omitempty" `
PublishedService string ` description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty" `
2019-03-18 09:10:04 +00:00
}
2020-11-19 23:18:04 +00:00
func ( p * Provider ) newK8sClient ( ctx context . Context ) ( * clientWrapper , error ) {
_ , err := labels . Parse ( p . LabelSelector )
2019-02-21 22:08:05 +00:00
if err != nil {
2020-11-19 23:18:04 +00:00
return nil , fmt . Errorf ( "invalid ingress label selector: %q" , p . LabelSelector )
2019-02-21 22:08:05 +00:00
}
2019-03-14 14:56:06 +00:00
logger := log . FromContext ( ctx )
2020-11-19 23:18:04 +00:00
logger . Infof ( "ingress label selector is: %q" , p . LabelSelector )
2019-02-21 22:08:05 +00:00
withEndpoint := ""
if p . Endpoint != "" {
withEndpoint = fmt . Sprintf ( " with endpoint %v" , p . Endpoint )
}
2019-03-14 14:56:06 +00:00
var cl * clientWrapper
2019-03-11 13:54:05 +00:00
switch {
case os . Getenv ( "KUBERNETES_SERVICE_HOST" ) != "" && os . Getenv ( "KUBERNETES_SERVICE_PORT" ) != "" :
2019-03-14 14:56:06 +00:00
logger . Infof ( "Creating in-cluster Provider client%s" , withEndpoint )
2019-02-21 22:08:05 +00:00
cl , err = newInClusterClient ( p . Endpoint )
2019-03-11 13:54:05 +00:00
case os . Getenv ( "KUBECONFIG" ) != "" :
2019-03-14 14:56:06 +00:00
logger . Infof ( "Creating cluster-external Provider client from KUBECONFIG %s" , os . Getenv ( "KUBECONFIG" ) )
2019-03-11 13:54:05 +00:00
cl , err = newExternalClusterClientFromFile ( os . Getenv ( "KUBECONFIG" ) )
default :
2019-03-14 14:56:06 +00:00
logger . Infof ( "Creating cluster-external Provider client%s" , withEndpoint )
2019-02-21 22:08:05 +00:00
cl , err = newExternalClusterClient ( p . Endpoint , p . Token , p . CertAuthFilePath )
}
2020-11-19 23:18:04 +00:00
if err != nil {
return nil , err
2019-02-21 22:08:05 +00:00
}
2020-11-19 23:18:04 +00:00
cl . ingressLabelSelector = p . LabelSelector
return cl , nil
2019-02-21 22:08:05 +00:00
}
// Init the provider.
func ( p * Provider ) Init ( ) error {
2019-03-27 14:02:06 +00:00
return nil
2019-02-21 22:08:05 +00:00
}
// Provide allows the k8s provider to provide configurations to traefik
// using the given configuration channel.
2019-07-10 07:26:04 +00:00
func ( p * Provider ) Provide ( configurationChan chan <- dynamic . Message , pool * safe . Pool ) error {
2019-03-14 14:56:06 +00:00
ctxLog := log . With ( context . Background ( ) , log . Str ( log . ProviderName , "kubernetes" ) )
2019-02-21 22:08:05 +00:00
logger := log . FromContext ( ctxLog )
2020-11-19 23:18:04 +00:00
k8sClient , err := p . newK8sClient ( ctxLog )
2019-02-21 22:08:05 +00:00
if err != nil {
return err
}
2020-02-03 16:56:04 +00:00
pool . GoCtx ( func ( ctxPool context . Context ) {
2019-02-21 22:08:05 +00:00
operation := func ( ) error {
2020-02-03 16:56:04 +00:00
eventsChan , err := k8sClient . WatchAll ( p . Namespaces , ctxPool . Done ( ) )
2019-02-21 22:08:05 +00:00
if err != nil {
logger . Errorf ( "Error watching kubernetes events: %v" , err )
timer := time . NewTimer ( 1 * time . Second )
select {
case <- timer . C :
return err
2020-02-03 16:56:04 +00:00
case <- ctxPool . Done ( ) :
2019-02-21 22:08:05 +00:00
return nil
}
}
2019-03-14 14:56:06 +00:00
2019-08-30 10:16:04 +00:00
throttleDuration := time . Duration ( p . ThrottleDuration )
2020-02-03 16:56:04 +00:00
throttledChan := throttleEvents ( ctxLog , throttleDuration , pool , eventsChan )
2019-08-31 12:10:04 +00:00
if throttledChan != nil {
eventsChan = throttledChan
}
2019-08-30 10:16:04 +00:00
2019-02-21 22:08:05 +00:00
for {
select {
2020-02-03 16:56:04 +00:00
case <- ctxPool . Done ( ) :
2019-02-21 22:08:05 +00:00
return nil
2019-08-31 12:10:04 +00:00
case event := <- eventsChan :
2019-08-30 10:16:04 +00:00
// Note that event is the *first* event that came in during this
// throttling interval -- if we're hitting our throttle, we may have
// dropped events. This is fine, because we don't treat different
// event types differently. But if we do in the future, we'll need to
// track more information about the dropped events.
2019-02-21 22:08:05 +00:00
conf := p . loadConfigurationFromIngresses ( ctxLog , k8sClient )
2019-10-25 13:46:05 +00:00
confHash , err := hashstructure . Hash ( conf , nil )
switch {
case err != nil :
logger . Error ( "Unable to hash the configuration" )
case p . lastConfiguration . Get ( ) == confHash :
2019-02-21 22:08:05 +00:00
logger . Debugf ( "Skipping Kubernetes event kind %T" , event )
2019-10-25 13:46:05 +00:00
default :
p . lastConfiguration . Set ( confHash )
2019-07-10 07:26:04 +00:00
configurationChan <- dynamic . Message {
2019-02-21 22:08:05 +00:00
ProviderName : "kubernetes" ,
Configuration : conf ,
}
}
2019-08-30 10:16:04 +00:00
// If we're throttling, we sleep here for the throttle duration to
// enforce that we don't refresh faster than our throttle. time.Sleep
// returns immediately if p.ThrottleDuration is 0 (no throttle).
time . Sleep ( throttleDuration )
2019-02-21 22:08:05 +00:00
}
}
}
notify := func ( err error , time time . Duration ) {
logger . Errorf ( "Provider connection error: %s; retrying in %s" , err , time )
}
2020-02-03 16:56:04 +00:00
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , backoff . WithContext ( job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , ctxPool ) , notify )
2019-02-21 22:08:05 +00:00
if err != nil {
logger . Errorf ( "Cannot connect to Provider: %s" , err )
}
} )
return nil
}
2019-07-10 07:26:04 +00:00
func ( p * Provider ) loadConfigurationFromIngresses ( ctx context . Context , client Client ) * dynamic . Configuration {
conf := & dynamic . Configuration {
HTTP : & dynamic . HTTPConfiguration {
Routers : map [ string ] * dynamic . Router { } ,
Middlewares : map [ string ] * dynamic . Middleware { } ,
Services : map [ string ] * dynamic . Service { } ,
2019-03-14 08:30:04 +00:00
} ,
2019-07-10 07:26:04 +00:00
TCP : & dynamic . TCPConfiguration { } ,
2019-02-21 22:08:05 +00:00
}
2020-07-21 13:32:04 +00:00
serverVersion , err := client . GetServerVersion ( )
2020-07-15 17:18:03 +00:00
if err != nil {
log . FromContext ( ctx ) . Errorf ( "Failed to get server version: %v" , err )
return conf
}
2021-05-17 14:50:09 +00:00
var ingressClasses [ ] * networkingv1 . IngressClass
2020-07-15 17:18:03 +00:00
2020-07-21 13:32:04 +00:00
if supportsIngressClass ( serverVersion ) {
2021-01-28 14:08:04 +00:00
ics , err := client . GetIngressClasses ( )
2020-07-15 17:18:03 +00:00
if err != nil {
2021-01-28 14:08:04 +00:00
log . FromContext ( ctx ) . Warnf ( "Failed to list ingress classes: %v" , err )
2020-07-15 17:18:03 +00:00
}
2021-03-02 20:34:03 +00:00
if p . IngressClass != "" {
ingressClasses = filterIngressClassByName ( p . IngressClass , ics )
} else {
ingressClasses = ics
}
2020-07-15 17:18:03 +00:00
}
2019-02-21 22:08:05 +00:00
ingresses := client . GetIngresses ( )
2020-01-14 14:48:06 +00:00
certConfigs := make ( map [ string ] * tls . CertAndStores )
2019-02-21 22:08:05 +00:00
for _ , ingress := range ingresses {
ctx = log . With ( ctx , log . Str ( "ingress" , ingress . Name ) , log . Str ( "namespace" , ingress . Namespace ) )
2021-01-28 14:08:04 +00:00
if ! p . shouldProcessIngress ( ingress , ingressClasses ) {
2019-02-21 22:08:05 +00:00
continue
}
2020-01-14 14:48:06 +00:00
rtConfig , err := parseRouterConfig ( ingress . Annotations )
if err != nil {
log . FromContext ( ctx ) . Errorf ( "Failed to parse annotations: %v" , err )
continue
}
err = getCertificates ( ctx , ingress , client , certConfigs )
2019-02-21 22:08:05 +00:00
if err != nil {
log . FromContext ( ctx ) . Errorf ( "Error configuring TLS: %v" , err )
}
2021-03-15 10:16:04 +00:00
if len ( ingress . Spec . Rules ) == 0 && ingress . Spec . DefaultBackend != nil {
2020-01-14 14:48:06 +00:00
if _ , ok := conf . HTTP . Services [ "default-backend" ] ; ok {
log . FromContext ( ctx ) . Error ( "The default backend already exists." )
continue
}
2019-02-21 22:08:05 +00:00
2021-03-15 10:16:04 +00:00
service , err := loadService ( client , ingress . Namespace , * ingress . Spec . DefaultBackend )
2020-01-14 14:48:06 +00:00
if err != nil {
log . FromContext ( ctx ) .
2021-03-15 10:16:04 +00:00
WithField ( "serviceName" , ingress . Spec . DefaultBackend . Service . Name ) .
WithField ( "servicePort" , ingress . Spec . DefaultBackend . Service . Port . String ( ) ) .
2020-01-14 14:48:06 +00:00
Errorf ( "Cannot create service: %v" , err )
continue
}
2019-02-21 22:08:05 +00:00
2021-05-06 16:12:10 +00:00
if len ( service . LoadBalancer . Servers ) == 0 && ! p . AllowEmptyServices {
log . FromContext ( ctx ) .
WithField ( "serviceName" , ingress . Spec . DefaultBackend . Service . Name ) .
WithField ( "servicePort" , ingress . Spec . DefaultBackend . Service . Port . String ( ) ) .
Errorf ( "Skipping service: no endpoints found" )
continue
}
2020-01-14 14:48:06 +00:00
rt := & dynamic . Router {
Rule : "PathPrefix(`/`)" ,
Priority : math . MinInt32 ,
Service : "default-backend" ,
}
2019-02-21 22:08:05 +00:00
2020-01-14 14:48:06 +00:00
if rtConfig != nil && rtConfig . Router != nil {
rt . EntryPoints = rtConfig . Router . EntryPoints
rt . Middlewares = rtConfig . Router . Middlewares
rt . TLS = rtConfig . Router . TLS
2019-02-21 22:08:05 +00:00
}
2020-01-14 14:48:06 +00:00
conf . HTTP . Routers [ "default-router" ] = rt
conf . HTTP . Services [ "default-backend" ] = service
2019-02-21 22:08:05 +00:00
}
2020-01-07 15:26:08 +00:00
2021-01-05 11:26:04 +00:00
routers := map [ string ] [ ] * dynamic . Router { }
2019-02-21 22:08:05 +00:00
for _ , rule := range ingress . Spec . Rules {
2020-01-22 02:44:04 +00:00
if err := p . updateIngressStatus ( ingress , client ) ; err != nil {
log . FromContext ( ctx ) . Errorf ( "Error while updating ingress status: %v" , err )
}
2019-02-21 22:08:05 +00:00
2020-01-22 02:44:04 +00:00
if rule . HTTP == nil {
continue
}
2019-02-21 22:08:05 +00:00
2020-01-22 02:44:04 +00:00
for _ , pa := range rule . HTTP . Paths {
service , err := loadService ( client , ingress . Namespace , pa . Backend )
if err != nil {
log . FromContext ( ctx ) .
2021-03-15 10:16:04 +00:00
WithField ( "serviceName" , pa . Backend . Service . Name ) .
WithField ( "servicePort" , pa . Backend . Service . Port . String ( ) ) .
2020-01-22 02:44:04 +00:00
Errorf ( "Cannot create service: %v" , err )
continue
2019-08-14 17:16:06 +00:00
}
2020-01-07 15:26:08 +00:00
2021-05-06 16:12:10 +00:00
if len ( service . LoadBalancer . Servers ) == 0 && ! p . AllowEmptyServices {
log . FromContext ( ctx ) .
WithField ( "serviceName" , pa . Backend . Service . Name ) .
WithField ( "servicePort" , pa . Backend . Service . Port . String ( ) ) .
Errorf ( "Skipping service: no endpoints found" )
continue
}
2021-03-15 10:16:04 +00:00
portString := pa . Backend . Service . Port . Name
if len ( pa . Backend . Service . Port . Name ) == 0 {
portString = fmt . Sprint ( pa . Backend . Service . Port . Number )
}
serviceName := provider . Normalize ( ingress . Namespace + "-" + pa . Backend . Service . Name + "-" + portString )
2020-01-22 02:44:04 +00:00
conf . HTTP . Services [ serviceName ] = service
2020-02-18 16:34:05 +00:00
routerKey := strings . TrimPrefix ( provider . Normalize ( ingress . Name + "-" + ingress . Namespace + "-" + rule . Host + pa . Path ) , "-" )
2021-01-05 11:26:04 +00:00
routers [ routerKey ] = append ( routers [ routerKey ] , loadRouter ( rule , pa , rtConfig , serviceName ) )
}
}
for routerKey , conflictingRouters := range routers {
if len ( conflictingRouters ) == 1 {
conf . HTTP . Routers [ routerKey ] = conflictingRouters [ 0 ]
continue
}
log . FromContext ( ctx ) . Debugf ( "Multiple routers are defined with the same key %q, generating hashes to avoid conflicts" , routerKey )
for _ , router := range conflictingRouters {
key , err := makeRouterKeyWithHash ( routerKey , router . Rule )
if err != nil {
log . FromContext ( ctx ) . Error ( err )
continue
}
2020-03-18 12:30:04 +00:00
2021-01-05 11:26:04 +00:00
conf . HTTP . Routers [ key ] = router
2019-03-18 09:10:04 +00:00
}
2019-02-21 22:08:05 +00:00
}
}
2020-01-14 14:48:06 +00:00
certs := getTLSConfig ( certConfigs )
2019-06-27 21:58:03 +00:00
if len ( certs ) > 0 {
2019-07-10 07:26:04 +00:00
conf . TLS = & dynamic . TLSConfiguration {
2019-06-27 21:58:03 +00:00
Certificates : certs ,
}
}
2019-02-21 22:08:05 +00:00
return conf
}
2021-03-15 10:16:04 +00:00
func ( p * Provider ) updateIngressStatus ( ing * networkingv1 . Ingress , k8sClient Client ) error {
2020-07-15 17:18:03 +00:00
// Only process if an EndpointIngress has been configured.
2020-01-14 14:48:06 +00:00
if p . IngressEndpoint == nil {
return nil
}
if len ( p . IngressEndpoint . PublishedService ) == 0 {
if len ( p . IngressEndpoint . IP ) == 0 && len ( p . IngressEndpoint . Hostname ) == 0 {
return errors . New ( "publishedService or ip or hostname must be defined" )
}
2020-09-15 11:48:32 +00:00
return k8sClient . UpdateIngressStatus ( ing , [ ] corev1 . LoadBalancerIngress { { IP : p . IngressEndpoint . IP , Hostname : p . IngressEndpoint . Hostname } } )
2020-01-14 14:48:06 +00:00
}
serviceInfo := strings . Split ( p . IngressEndpoint . PublishedService , "/" )
if len ( serviceInfo ) != 2 {
return fmt . Errorf ( "invalid publishedService format (expected 'namespace/service' format): %s" , p . IngressEndpoint . PublishedService )
}
2020-01-16 09:14:06 +00:00
2020-01-14 14:48:06 +00:00
serviceNamespace , serviceName := serviceInfo [ 0 ] , serviceInfo [ 1 ]
service , exists , err := k8sClient . GetService ( serviceNamespace , serviceName )
if err != nil {
2020-05-11 10:06:07 +00:00
return fmt . Errorf ( "cannot get service %s, received error: %w" , p . IngressEndpoint . PublishedService , err )
2020-01-14 14:48:06 +00:00
}
if exists && service . Status . LoadBalancer . Ingress == nil {
// service exists, but has no Load Balancer status
2020-01-16 09:14:06 +00:00
log . Debugf ( "Skipping updating Ingress %s/%s due to service %s having no status set" , ing . Namespace , ing . Name , p . IngressEndpoint . PublishedService )
2020-01-14 14:48:06 +00:00
return nil
}
if ! exists {
return fmt . Errorf ( "missing service: %s" , p . IngressEndpoint . PublishedService )
}
2020-09-15 11:48:32 +00:00
return k8sClient . UpdateIngressStatus ( ing , service . Status . LoadBalancer . Ingress )
2020-01-14 14:48:06 +00:00
}
2021-05-17 14:50:09 +00:00
func ( p * Provider ) shouldProcessIngress ( ingress * networkingv1 . Ingress , ingressClasses [ ] * networkingv1 . IngressClass ) bool {
2020-07-28 15:50:04 +00:00
// configuration through the new kubernetes ingressClass
if ingress . Spec . IngressClassName != nil {
2021-01-28 14:08:04 +00:00
for _ , ic := range ingressClasses {
if * ingress . Spec . IngressClassName == ic . ObjectMeta . Name {
return true
}
}
return false
2020-07-28 15:50:04 +00:00
}
2021-01-28 14:08:04 +00:00
return p . IngressClass == ingress . Annotations [ annotationKubernetesIngressClass ] ||
len ( p . IngressClass ) == 0 && ingress . Annotations [ annotationKubernetesIngressClass ] == traefikDefaultIngressClass
2020-07-15 17:18:03 +00:00
}
2020-02-03 10:24:06 +00:00
func buildHostRule ( host string ) string {
if strings . HasPrefix ( host , "*." ) {
return "HostRegexp(`" + strings . Replace ( host , "*." , "{subdomain:[a-zA-Z0-9-]+}." , 1 ) + "`)"
}
return "Host(`" + host + "`)"
}
2021-03-15 10:16:04 +00:00
func getCertificates ( ctx context . Context , ingress * networkingv1 . Ingress , k8sClient Client , tlsConfigs map [ string ] * tls . CertAndStores ) error {
2019-02-21 22:08:05 +00:00
for _ , t := range ingress . Spec . TLS {
if t . SecretName == "" {
log . FromContext ( ctx ) . Debugf ( "Skipping TLS sub-section: No secret name provided" )
continue
}
2019-09-13 18:44:04 +00:00
configKey := ingress . Namespace + "-" + t . SecretName
2019-02-21 22:08:05 +00:00
if _ , tlsExists := tlsConfigs [ configKey ] ; ! tlsExists {
secret , exists , err := k8sClient . GetSecret ( ingress . Namespace , t . SecretName )
if err != nil {
2020-05-11 10:06:07 +00:00
return fmt . Errorf ( "failed to fetch secret %s/%s: %w" , ingress . Namespace , t . SecretName , err )
2019-02-21 22:08:05 +00:00
}
if ! exists {
return fmt . Errorf ( "secret %s/%s does not exist" , ingress . Namespace , t . SecretName )
}
cert , key , err := getCertificateBlocks ( secret , ingress . Namespace , t . SecretName )
if err != nil {
return err
}
2019-06-27 21:58:03 +00:00
tlsConfigs [ configKey ] = & tls . CertAndStores {
Certificate : tls . Certificate {
2019-02-21 22:08:05 +00:00
CertFile : tls . FileOrContent ( cert ) ,
KeyFile : tls . FileOrContent ( key ) ,
} ,
}
}
}
return nil
}
func getCertificateBlocks ( secret * corev1 . Secret , namespace , secretName string ) ( string , string , error ) {
var missingEntries [ ] string
tlsCrtData , tlsCrtExists := secret . Data [ "tls.crt" ]
if ! tlsCrtExists {
missingEntries = append ( missingEntries , "tls.crt" )
}
tlsKeyData , tlsKeyExists := secret . Data [ "tls.key" ]
if ! tlsKeyExists {
missingEntries = append ( missingEntries , "tls.key" )
}
if len ( missingEntries ) > 0 {
return "" , "" , fmt . Errorf ( "secret %s/%s is missing the following TLS data entries: %s" ,
namespace , secretName , strings . Join ( missingEntries , ", " ) )
}
cert := string ( tlsCrtData )
if cert == "" {
missingEntries = append ( missingEntries , "tls.crt" )
}
key := string ( tlsKeyData )
if key == "" {
missingEntries = append ( missingEntries , "tls.key" )
}
if len ( missingEntries ) > 0 {
return "" , "" , fmt . Errorf ( "secret %s/%s contains the following empty TLS data entries: %s" ,
namespace , secretName , strings . Join ( missingEntries , ", " ) )
}
return cert , key , nil
}
2019-03-18 09:10:04 +00:00
2020-01-14 14:48:06 +00:00
func getTLSConfig ( tlsConfigs map [ string ] * tls . CertAndStores ) [ ] * tls . CertAndStores {
var secretNames [ ] string
for secretName := range tlsConfigs {
secretNames = append ( secretNames , secretName )
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
sort . Strings ( secretNames )
2019-03-18 09:10:04 +00:00
2020-01-14 14:48:06 +00:00
var configs [ ] * tls . CertAndStores
for _ , secretName := range secretNames {
configs = append ( configs , tlsConfigs [ secretName ] )
}
return configs
}
2021-03-15 10:16:04 +00:00
func loadService ( client Client , namespace string , backend networkingv1 . IngressBackend ) ( * dynamic . Service , error ) {
service , exists , err := client . GetService ( namespace , backend . Service . Name )
2020-01-14 14:48:06 +00:00
if err != nil {
return nil , err
}
if ! exists {
return nil , errors . New ( "service not found" )
}
var portName string
var portSpec corev1 . ServicePort
var match bool
for _ , p := range service . Spec . Ports {
2021-03-15 10:16:04 +00:00
if backend . Service . Port . Number == p . Port || ( backend . Service . Port . Name == p . Name && len ( p . Name ) > 0 ) {
2020-01-14 14:48:06 +00:00
portName = p . Name
portSpec = p
match = true
break
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
}
2019-03-18 09:10:04 +00:00
2020-01-14 14:48:06 +00:00
if ! match {
return nil , errors . New ( "service port not found" )
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
svc := & dynamic . Service {
LoadBalancer : & dynamic . ServersLoadBalancer {
PassHostHeader : func ( v bool ) * bool { return & v } ( true ) ,
} ,
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
svcConfig , err := parseServiceConfig ( service . Annotations )
2019-03-18 09:10:04 +00:00
if err != nil {
2020-01-14 14:48:06 +00:00
return nil , err
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
if svcConfig != nil && svcConfig . Service != nil {
svc . LoadBalancer . Sticky = svcConfig . Service . Sticky
if svcConfig . Service . PassHostHeader != nil {
svc . LoadBalancer . PassHostHeader = svcConfig . Service . PassHostHeader
}
2019-03-18 09:10:04 +00:00
}
2020-01-14 14:48:06 +00:00
if service . Spec . Type == corev1 . ServiceTypeExternalName {
protocol := getProtocol ( portSpec , portSpec . Name , svcConfig )
2020-12-04 19:56:04 +00:00
hostPort := net . JoinHostPort ( service . Spec . ExternalName , strconv . Itoa ( int ( portSpec . Port ) ) )
2020-01-14 14:48:06 +00:00
svc . LoadBalancer . Servers = [ ] dynamic . Server {
2020-12-04 19:56:04 +00:00
{ URL : fmt . Sprintf ( "%s://%s" , protocol , hostPort ) } ,
2020-01-14 14:48:06 +00:00
}
return svc , nil
2019-03-18 09:10:04 +00:00
}
2021-03-15 10:16:04 +00:00
endpoints , endpointsExists , endpointsErr := client . GetEndpoints ( namespace , backend . Service . Name )
2020-01-14 14:48:06 +00:00
if endpointsErr != nil {
return nil , endpointsErr
}
if ! endpointsExists {
return nil , errors . New ( "endpoints not found" )
}
var port int32
for _ , subset := range endpoints . Subsets {
for _ , p := range subset . Ports {
if portName == p . Name {
port = p . Port
break
}
}
if port == 0 {
2021-04-15 16:16:04 +00:00
continue
2020-01-14 14:48:06 +00:00
}
protocol := getProtocol ( portSpec , portName , svcConfig )
for _ , addr := range subset . Addresses {
2020-12-04 19:56:04 +00:00
hostPort := net . JoinHostPort ( addr . IP , strconv . Itoa ( int ( port ) ) )
2020-01-14 14:48:06 +00:00
svc . LoadBalancer . Servers = append ( svc . LoadBalancer . Servers , dynamic . Server {
2020-12-04 19:56:04 +00:00
URL : fmt . Sprintf ( "%s://%s" , protocol , hostPort ) ,
2020-01-14 14:48:06 +00:00
} )
}
}
return svc , nil
}
func getProtocol ( portSpec corev1 . ServicePort , portName string , svcConfig * ServiceConfig ) string {
if svcConfig != nil && svcConfig . Service != nil && svcConfig . Service . ServersScheme != "" {
return svcConfig . Service . ServersScheme
}
protocol := "http"
if portSpec . Port == 443 || strings . HasPrefix ( portName , "https" ) {
protocol = "https"
}
return protocol
}
2021-01-05 11:26:04 +00:00
func makeRouterKeyWithHash ( key , rule string ) ( string , error ) {
h := sha256 . New ( )
if _ , err := h . Write ( [ ] byte ( rule ) ) ; err != nil {
return "" , err
}
dupKey := fmt . Sprintf ( "%s-%.10x" , key , h . Sum ( nil ) )
return dupKey , nil
}
2021-03-15 10:16:04 +00:00
func loadRouter ( rule networkingv1 . IngressRule , pa networkingv1 . HTTPIngressPath , rtConfig * RouterConfig , serviceName string ) * dynamic . Router {
2020-01-14 14:48:06 +00:00
var rules [ ] string
if len ( rule . Host ) > 0 {
2020-02-10 15:03:39 +00:00
rules = [ ] string { buildHostRule ( rule . Host ) }
2020-01-14 14:48:06 +00:00
}
if len ( pa . Path ) > 0 {
matcher := defaultPathMatcher
2020-07-28 15:50:04 +00:00
2021-03-15 10:16:04 +00:00
if pa . PathType == nil || * pa . PathType == "" || * pa . PathType == networkingv1 . PathTypeImplementationSpecific {
2020-07-28 15:50:04 +00:00
if rtConfig != nil && rtConfig . Router != nil && rtConfig . Router . PathMatcher != "" {
matcher = rtConfig . Router . PathMatcher
}
2021-03-15 10:16:04 +00:00
} else if * pa . PathType == networkingv1 . PathTypeExact {
2020-07-28 15:50:04 +00:00
matcher = "Path"
2020-01-14 14:48:06 +00:00
}
rules = append ( rules , fmt . Sprintf ( "%s(`%s`)" , matcher , pa . Path ) )
}
rt := & dynamic . Router {
Rule : strings . Join ( rules , " && " ) ,
Service : serviceName ,
}
if rtConfig != nil && rtConfig . Router != nil {
rt . Priority = rtConfig . Router . Priority
rt . EntryPoints = rtConfig . Router . EntryPoints
rt . Middlewares = rtConfig . Router . Middlewares
if rtConfig . Router . TLS != nil {
rt . TLS = rtConfig . Router . TLS
}
}
return rt
}
2020-02-03 16:56:04 +00:00
func throttleEvents ( ctx context . Context , throttleDuration time . Duration , pool * safe . Pool , eventsChan <- chan interface { } ) chan interface { } {
2019-08-30 10:16:04 +00:00
if throttleDuration == 0 {
return nil
}
2020-07-15 17:18:03 +00:00
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling).
2019-08-30 10:16:04 +00:00
eventsChanBuffered := make ( chan interface { } , 1 )
// Run a goroutine that reads events from eventChan and does a
// non-blocking write to pendingEvent. This guarantees that writing to
// eventChan will never block, and that pendingEvent will have
// something in it if there's been an event since we read from that channel.
2020-02-03 16:56:04 +00:00
pool . GoCtx ( func ( ctxPool context . Context ) {
2019-08-30 10:16:04 +00:00
for {
select {
2020-02-03 16:56:04 +00:00
case <- ctxPool . Done ( ) :
2019-08-30 10:16:04 +00:00
return
case nextEvent := <- eventsChan :
select {
case eventsChanBuffered <- nextEvent :
default :
// We already have an event in eventsChanBuffered, so we'll
// do a refresh as soon as our throttle allows us to. It's fine
// to drop the event and keep whatever's in the buffer -- we
2020-07-15 17:18:03 +00:00
// don't do different things for different events.
2019-08-30 10:16:04 +00:00
log . FromContext ( ctx ) . Debugf ( "Dropping event kind %T due to throttling" , nextEvent )
}
}
}
2020-02-03 16:56:04 +00:00
} )
2019-08-30 10:16:04 +00:00
return eventsChanBuffered
}