summaryrefslogtreecommitdiffstats
path: root/kube2consul/src
diff options
context:
space:
mode:
Diffstat (limited to 'kube2consul/src')
-rw-r--r--kube2consul/src/kube2consul/Makefile11
-rw-r--r--kube2consul/src/kube2consul/consul_work.go366
-rw-r--r--kube2consul/src/kube2consul/kube2consul.go449
-rw-r--r--kube2consul/src/kube2consul/kube_service.go100
-rw-r--r--kube2consul/src/kube2consul/kube_work.go511
-rw-r--r--kube2consul/src/kube2consul/pod_service.go677
-rw-r--r--kube2consul/src/kube2consul/types.go96
-rw-r--r--kube2consul/src/kube2consul/util/restclient/restclient.go42
-rw-r--r--kube2consul/src/main/blueprint/deploy.yml8
-rw-r--r--kube2consul/src/main/blueprint/list_of_servicelet.list4
-rw-r--r--kube2consul/src/main/blueprint/task.yml118
-rw-r--r--kube2consul/src/main/blueprint/vars.yml14
-rw-r--r--kube2consul/src/main/docker/Dockerfile11
-rw-r--r--kube2consul/src/main/docker/start.sh36
14 files changed, 2443 insertions, 0 deletions
diff --git a/kube2consul/src/kube2consul/Makefile b/kube2consul/src/kube2consul/Makefile
new file mode 100644
index 0000000..141abb0
--- /dev/null
+++ b/kube2consul/src/kube2consul/Makefile
@@ -0,0 +1,11 @@
+.PHONY: kube2consul clean test
+
+kube2consul: kube2consul.go
+ CGO_ENABLED=0 go build --ldflags '-extldflags "-static"'
+ strip kube2consul
+
+clean:
+ rm -fr kube2consul
+
+test: clean
+ go test -v --vmodule=*=4
diff --git a/kube2consul/src/kube2consul/consul_work.go b/kube2consul/src/kube2consul/consul_work.go
new file mode 100644
index 0000000..2e5927a
--- /dev/null
+++ b/kube2consul/src/kube2consul/consul_work.go
@@ -0,0 +1,366 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// consul_work.go
+package main
+
+import (
+ "log"
+ "strings"
+ "sync"
+
+ consulapi "github.com/hashicorp/consul/api"
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type ConsulWorker interface {
+ AddService(config DnsInfo, service *kapi.Service)
+ RemoveService(config DnsInfo)
+ AddPod(config DnsInfo, pod *kapi.Pod)
+ RemovePod(config DnsInfo)
+ SyncDNS()
+}
+
+type ConsulAgentWorker struct {
+ sync.Mutex
+ ConsulWorker
+ ids map[string][]*consulapi.AgentServiceRegistration
+ agent *consulapi.Client
+}
+
+func newConsulAgentWorker(client *consulapi.Client) *ConsulAgentWorker {
+ return &ConsulAgentWorker{
+ agent: client,
+ ids: make(map[string][]*consulapi.AgentServiceRegistration),
+ }
+}
+
+func (client *ConsulAgentWorker) AddService(config DnsInfo, service *kapi.Service) {
+ client.Lock()
+ defer client.Unlock()
+ log.Println("Starting Add Service for: ", config.BaseID)
+
+ if config.IPAddress == "" || config.BaseID == "" {
+ log.Println("Service Info is not valid for AddService")
+ return
+ }
+
+ annos := service.ObjectMeta.Annotations
+ //only register service with annotations
+ if len(annos) == 0 {
+ log.Println("no Annotation defined for this service:", service.Name)
+ return
+ }
+
+ for _, port := range service.Spec.Ports {
+ kubeService := newKubeService()
+ kubeService.BuildServiceInfoMap(&port, &annos)
+
+ if len(kubeService.sInfos) == 0 {
+ log.Println("no Service defined for this port:", port.Port)
+ continue
+ }
+
+ //remove service with no name
+ for id, s := range kubeService.sInfos {
+ if len(s.Name) == 0 {
+ if id == "0" {
+ log.Printf("SERVICE_%d_NAME not set, ignore this service.", port.Port)
+ } else {
+ log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", port.Port, id)
+ }
+ delete(kubeService.sInfos, id)
+ }
+ }
+
+ //test again
+ if len(kubeService.sInfos) == 0 {
+ log.Println("no Service defined for this port:", port.Port)
+ continue
+ }
+
+ for _, s := range kubeService.sInfos {
+ asr := kubeService.BuildAgentService(s, config, &port)
+ if *argChecks && port.Protocol == "TCP" && config.ServiceType != kapi.ServiceTypeClusterIP {
+ //Create Check if neeeded
+ asr.Check = kubeService.BuildAgentServiceCheck(config, &port)
+ }
+
+ if client.agent != nil {
+ //Registers with DNS
+ if err := client.agent.Agent().ServiceRegister(asr); err != nil {
+ log.Println("Error creating service record: ", asr.ID)
+ }
+ }
+
+ //Add to IDS
+ client.ids[config.BaseID] = append(client.ids[config.BaseID], asr)
+ }
+ }
+ log.Println("Added Service: ", service.Name)
+}
+
+func (client *ConsulAgentWorker) RemoveService(config DnsInfo) {
+ client.Lock()
+ defer client.Unlock()
+ if ids, ok := client.ids[config.BaseID]; ok {
+ for _, asr := range ids {
+ if client.agent != nil {
+ if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil {
+ log.Println("Error removing service: ", err)
+ }
+ }
+ }
+ delete(client.ids, config.BaseID)
+ } else {
+ log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID)
+ }
+}
+
+func (client *ConsulAgentWorker) AddPod(config DnsInfo, pod *kapi.Pod) {
+ client.Lock()
+ defer client.Unlock()
+ log.Println("Starting Add Pod for: ", config.BaseID)
+
+ //double check if have Pod IPs so far in Paas mode
+ if *argPdmControllerUrl != "" {
+ if _, ok := pdmPodIPsMap[pod.Name]; !ok {
+ log.Println("In Paas mode, We don't have Pod IPs to register with for Pod: ", pod.Name)
+ return
+ }
+ }
+
+ containerIdMap := make(map[string]string)
+ buildContainerIdMap(pod, &containerIdMap)
+
+ for _, c := range pod.Spec.Containers {
+
+ for _, p := range c.Ports {
+ podService := newPodService()
+ podService.BuildServiceInfoMap(&p, &c)
+ if len(podService.sInfos) == 0 {
+ log.Println("no Service defined for this port:", p.ContainerPort)
+ continue
+ }
+
+ //remove service with no name
+ for id, s := range podService.sInfos {
+ if len(s.Name) == 0 {
+ if id == "0" {
+ log.Printf("SERVICE_%d_NAME not set, ignore this service.", p.ContainerPort)
+ } else {
+ log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", p.ContainerPort, id)
+ }
+ delete(podService.sInfos, id)
+ }
+ }
+
+ //remove service if same service exist with different protocol
+ for id, s := range podService.sInfos {
+ services := []*consulapi.CatalogService{}
+ sn := s.Name
+ //append namespace if specified
+ if len(s.NS) != 0 {
+ sn = sn + "-" + s.NS
+ }
+ var tags []string
+ var protocol string
+ tags = strings.Split(s.Tags, ",")
+ for _, v := range tags {
+ var elems []string
+ elems = strings.Split(v, ":")
+ if elems[0] == PROTOCOL {
+ protocol = elems[1]
+ break
+ }
+ }
+ //remove service with empty protocol
+ if protocol == "" {
+ delete(podService.sInfos, id)
+ continue
+ }
+
+ protocol_field := "\"protocol\":\"" + protocol + "\""
+
+ //query consul
+ if client.agent != nil {
+ same_protocol := false
+ services, _, _ = client.agent.Catalog().Service(sn, "", nil)
+ if services == nil || len(services) == 0 {
+ continue
+ }
+ for _, v := range services[0].ServiceTags {
+ if strings.Contains(v, protocol_field) {
+ same_protocol = true
+ break
+ }
+ }
+
+ if !same_protocol {
+ log.Printf("same service with different protocol already exist, ignore service:%s, protocol:%s", sn, protocol)
+ delete(podService.sInfos, id)
+ }
+ }
+ }
+
+ //remove service with no network plane type in tags
+ if *argPdmControllerUrl != "" {
+ for id, s := range podService.sInfos {
+ if len(s.Tags) == 0 {
+ if id == "0" {
+ log.Printf("SERVICE_%d_TAGS not set, ignore this service in Paas mode.", p.ContainerPort)
+ } else {
+ log.Printf("SERVICE_%d_TAGS_%s not set, ignore this service in Paas mode.", p.ContainerPort, id)
+ }
+ delete(podService.sInfos, id)
+ } else {
+ var tags []string
+ tags = strings.Split(s.Tags, ",")
+ for _, v := range tags {
+ var elems []string
+ elems = strings.Split(v, ":")
+ //this is saved in Tags, later will be moved to Labels??
+ if elems[0] == NETWORK_PLANE_TYPE {
+ types := strings.Split(elems[1], "|")
+ for _, t := range types {
+ ip := getIPFromPodIPsMap(t, pod.Name)
+ if ip == "" {
+ log.Printf("User defined network type:%s has no assigned ip for Pod:%s", t, pod.Name)
+ } else {
+ log.Printf("Found ip:%s for network type:%s", ip, t)
+ s.IPs = append(s.IPs, PodIP{
+ NetworkPlane: t,
+ IPAddress: ip,
+ })
+ }
+ }
+ }
+ }
+
+ if len(s.IPs) == 0 {
+ log.Printf("In Paas mode, no IP assigned for Pod:ContainerPort->%s:%d", pod.Name, p.ContainerPort)
+ delete(podService.sInfos, id)
+ }
+ }
+ }
+ }
+
+ //test again
+ if len(podService.sInfos) == 0 {
+ log.Println("no Service defined for this port:", p.ContainerPort)
+ continue
+ }
+
+ for _, s := range podService.sInfos {
+ asrs := podService.BuildAgentService(s, config, &p, containerIdMap[c.Name])
+ if client.agent != nil {
+ for _, asr := range asrs {
+ //Registers with DNS
+ if err := client.agent.Agent().ServiceRegister(asr); err != nil {
+ log.Println("Error creating service record: ", asr.ID)
+ } else {
+ log.Printf("Added service:instance ->%s:%s", asr.Name, asr.ID)
+ }
+ client.ids[config.BaseID] = append(client.ids[config.BaseID], asr)
+ }
+ } else {
+ log.Println("Consul client is not available.")
+ }
+ }
+ }
+ }
+ log.Println("Added Pod: ", pod.Name)
+}
+
+func (client *ConsulAgentWorker) RemovePod(config DnsInfo) {
+ client.Lock()
+ defer client.Unlock()
+ if ids, ok := client.ids[config.BaseID]; ok {
+ for _, asr := range ids {
+ if client.agent != nil {
+ if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil {
+ log.Println("Error removing service: ", err)
+ } else {
+ log.Printf("removed service -> %s:%d", asr.Name, asr.Port)
+ }
+ }
+ }
+ delete(client.ids, config.BaseID)
+ log.Println("Removed Pod: ", config.BaseID)
+ } else {
+ log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID)
+ }
+}
+
+func (client *ConsulAgentWorker) SyncDNS() {
+ if client.agent != nil {
+ if services, err := client.agent.Agent().Services(); err == nil {
+ for _, registered := range client.ids {
+ for _, service := range registered {
+ if !containsServiceId(service.ID, services) {
+ log.Println("Regregistering missing service ID: ", service.ID)
+ client.agent.Agent().ServiceRegister(service)
+ }
+ }
+ }
+ for _, service := range services {
+ found := false
+ for _, registered := range client.ids {
+ for _, svc := range registered {
+ if service.ID == svc.ID {
+ found = true
+ break
+ }
+ }
+ if found {
+ break
+ }
+ }
+ if !found {
+ log.Println("Degregistering dead service ID: ", service.ID)
+ client.agent.Agent().ServiceDeregister(service.ID)
+ }
+ }
+ } else {
+ log.Println("Error retreiving services from consul during sync: ", err)
+ }
+ }
+}
+
+func buildContainerIdMap(pod *kapi.Pod, cmap *map[string]string) {
+ for _, s := range pod.Status.ContainerStatuses {
+ id := strings.TrimLeft(s.ContainerID, "docker://")
+ (*cmap)[s.Name] = id
+ }
+}
+
+func containsServiceId(id string, services map[string]*consulapi.AgentService) bool {
+ for _, service := range services {
+ if service.ID == id {
+ return true
+ }
+ }
+ return false
+}
+
+func getIPFromPodIPsMap(ptype string, podname string) string {
+ ips := pdmPodIPsMap[podname]
+ for _, v := range ips {
+ if v.NetworkPlane == ptype {
+ return v.IPAddress
+ }
+ }
+ return ""
+}
diff --git a/kube2consul/src/kube2consul/kube2consul.go b/kube2consul/src/kube2consul/kube2consul.go
new file mode 100644
index 0000000..f1213f2
--- /dev/null
+++ b/kube2consul/src/kube2consul/kube2consul.go
@@ -0,0 +1,449 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// kube2consul.go
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/url"
+ "os"
+ "reflect"
+ "time"
+
+ consulapi "github.com/hashicorp/consul/api"
+ kapi "k8s.io/kubernetes/pkg/api"
+ kcache "k8s.io/kubernetes/pkg/client/cache"
+ kclient "k8s.io/kubernetes/pkg/client/unversioned"
+ kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
+ kframework "k8s.io/kubernetes/pkg/controller/framework"
+ kselector "k8s.io/kubernetes/pkg/fields"
+ klabels "k8s.io/kubernetes/pkg/labels"
+)
+
+var (
+ argConsulAgent = flag.String("consul-agent", "http://127.0.0.1:8500", "URL to consul agent")
+ argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.")
+ argChecks = flag.Bool("checks", false, "Adds TCP service checks for each TCP Service")
+ argPdmControllerUrl = flag.String("pdm_controller_url", "", "URL to pdm controller")
+ addMap = make(map[string]*kapi.Pod)
+ deleteMap = make(map[string]*kapi.Pod)
+ pdmPodIPsMap = make(map[string][]PodIP)
+ nodeSelector = klabels.Everything()
+)
+
+const (
+ // Maximum number of attempts to connect to consul server.
+ maxConnectAttempts = 12
+ // Resync period for the kube controller loop.
+ resyncPeriod = 5 * time.Second
+ // Default Health check interval
+ DefaultInterval = "10s"
+ // Resource url to query pod from PDM controller
+ podUrl = "nw/v1/tenants"
+)
+
+const (
+ Service_NAME = "NAME"
+ Service_TAGS = "TAGS"
+ Service_LABELS = "LABELS"
+ Service_MDATA = "MDATA"
+ Service_NS = "NAMESPACE"
+ Service_TTL = "TTL"
+ Service_HTTP = "HTTP"
+ Service_TCP = "TCP"
+ Service_CMD = "CMD"
+ Service_SCRIPT = "SCRIPT"
+ Service_TIMEOUT = "TIMEOUT"
+ Service_INTERVAL = "INTERVAL"
+)
+
+const (
+ Table_BASE = "base"
+ Table_LB = "lb"
+ Table_LABELS = "labels"
+ Table_META_DATA = "metadata"
+ Table_NS = "ns"
+ Table_CHECKS = "checks"
+)
+
+const (
+ PROTOCOL = "protocol"
+ PROTOCOL_UI = "UI"
+ PREFIX_UI = "IUI_"
+)
+
+const (
+ //filter out from the tags for compatibility
+ NETWORK_PLANE_TYPE = "network_plane_type"
+ VISUAL_RANGE = "visualRange"
+ LB_POLICY = "lb_policy"
+ LB_SVR_PARAMS = "lb_server_params"
+)
+
+const DefaultLabels = "\"labels\":{\"visualRange\":\"1\"}"
+
+func getKubeMasterUrl() (string, error) {
+ if *argKubeMasterUrl == "" {
+ return "", fmt.Errorf("no --kube_master_url specified")
+ }
+ parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl))
+ if err != nil {
+ return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err)
+ }
+ if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
+ return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl)
+ }
+ return parsedUrl.String(), nil
+}
+
+func newConsulClient(consulAgent string) (*consulapi.Client, error) {
+ var (
+ client *consulapi.Client
+ err error
+ )
+
+ consulConfig := consulapi.DefaultConfig()
+ consulAgentUrl, err := url.Parse(consulAgent)
+ if err != nil {
+ log.Println("Error parsing Consul url")
+ return nil, err
+ }
+
+ if consulAgentUrl.Host != "" {
+ consulConfig.Address = consulAgentUrl.Host
+ }
+
+ if consulAgentUrl.Scheme != "" {
+ consulConfig.Scheme = consulAgentUrl.Scheme
+ }
+
+ client, err = consulapi.NewClient(consulConfig)
+ if err != nil {
+ log.Println("Error creating Consul client")
+ return nil, err
+ }
+
+ for attempt := 1; attempt <= maxConnectAttempts; attempt++ {
+ if _, err = client.Agent().Self(); err == nil {
+ break
+ }
+
+ if attempt == maxConnectAttempts {
+ break
+ }
+
+ log.Printf("[Attempt: %d] Attempting access to Consul after 5 second sleep", attempt)
+ time.Sleep(5 * time.Second)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to Consul agent: %v, error: %v", consulAgent, err)
+ }
+ log.Printf("Consul agent found: %v", consulAgent)
+
+ return client, nil
+}
+
+func newKubeClient() (*kclient.Client, error) {
+ masterUrl, err := getKubeMasterUrl()
+ if err != nil {
+ return nil, err
+ }
+ overrides := &kclientcmd.ConfigOverrides{}
+ overrides.ClusterInfo.Server = masterUrl
+
+ rules := kclientcmd.NewDefaultClientConfigLoadingRules()
+ kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
+
+ if err != nil {
+ log.Println("Error creating Kube Config", err)
+ return nil, err
+ }
+ kubeConfig.Host = masterUrl
+
+ log.Printf("Using %s for kubernetes master", kubeConfig.Host)
+ return kclient.New(kubeConfig)
+}
+
+func createNodeLW(kubeClient *kclient.Client) *kcache.ListWatch {
+ return kcache.NewListWatchFromClient(kubeClient, "nodes", kapi.NamespaceAll, kselector.Everything())
+}
+
+func sendNodeWork(action KubeWorkAction, queue chan<- KubeWork, oldObject, newObject interface{}) {
+ if node, ok := newObject.(*kapi.Node); ok {
+ if nodeSelector.Matches(klabels.Set(node.Labels)) == false {
+ log.Printf("Ignoring node %s due to label selectors", node.Name)
+ return
+ }
+
+ log.Println("Node Action: ", action, " for node ", node.Name)
+ work := KubeWork{
+ Action: action,
+ Node: node,
+ }
+
+ if action == KubeWorkUpdateNode {
+ if oldNode, ok := oldObject.(*kapi.Node); ok {
+ if nodeReady(node) != nodeReady(oldNode) {
+ log.Println("Ready state change. Old:", nodeReady(oldNode), " New: ", nodeReady(node))
+ queue <- work
+ }
+ }
+ } else {
+ queue <- work
+ }
+ }
+}
+
+func watchForNodes(kubeClient *kclient.Client, queue chan<- KubeWork) {
+ _, nodeController := kframework.NewInformer(
+ createNodeLW(kubeClient),
+ &kapi.Node{},
+ resyncPeriod,
+ kframework.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ sendNodeWork(KubeWorkAddNode, queue, nil, obj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ sendNodeWork(KubeWorkRemoveNode, queue, nil, obj)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ sendNodeWork(KubeWorkUpdateNode, queue, oldObj, newObj)
+ },
+ },
+ )
+ stop := make(chan struct{})
+ go nodeController.Run(stop)
+}
+
+// Returns a cache.ListWatch that gets all changes to services.
+func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
+ return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything())
+}
+
+func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) {
+ if service, ok := serviceObj.(*kapi.Service); ok {
+ log.Println("Service Action: ", action, " for service ", service.Name)
+ queue <- KubeWork{
+ Action: action,
+ Service: service,
+ }
+ }
+}
+
+func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) {
+ _, svcController := kframework.NewInformer(
+ createServiceLW(kubeClient),
+ &kapi.Service{},
+ resyncPeriod,
+ kframework.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ sendServiceWork(KubeWorkAddService, queue, obj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ sendServiceWork(KubeWorkRemoveService, queue, obj)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ if reflect.DeepEqual(newObj, oldObj) == false {
+ sendServiceWork(KubeWorkUpdateService, queue, newObj)
+ }
+ },
+ },
+ )
+ stop := make(chan struct{})
+ go svcController.Run(stop)
+}
+
+// Returns a cache.ListWatch that gets all changes to Pods.
+func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch {
+ return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything())
+}
+
+// Dispatch the notifications for Pods by type to the worker
+func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) {
+ if pod, ok := podObj.(*kapi.Pod); ok {
+ log.Println("Pod Action: ", action, " for Pod:", pod.Name)
+ queue <- KubeWork{
+ Action: action,
+ Pod: pod,
+ }
+ }
+}
+
+// Launch the go routine to watch notifications for Pods.
+func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) {
+ var podController *kframework.Controller
+ _, podController = kframework.NewInformer(
+ createPodLW(kubeClient),
+ &kapi.Pod{},
+ resyncPeriod,
+ kframework.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ sendPodWork(KubeWorkAddPod, queue, obj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ if o, ok := obj.(*kapi.Pod); ok {
+ if _, ok := deleteMap[o.Name]; ok {
+ delete(deleteMap, o.Name)
+ }
+ }
+ sendPodWork(KubeWorkRemovePod, queue, obj)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod)
+ if reflect.DeepEqual(oldObj, newObj) == false {
+ //Adding Pod
+ if _, ok := addMap[n.Name]; ok {
+ if kapi.IsPodReady(n) {
+ delete(addMap, n.Name)
+ sendPodWork(KubeWorkUpdatePod, queue, newObj)
+ }
+ return
+ }
+ //Deleting Pod
+ if _, ok := deleteMap[n.Name]; ok {
+ return
+ } else {
+ if o.ObjectMeta.DeletionTimestamp == nil &&
+ n.ObjectMeta.DeletionTimestamp != nil {
+ deleteMap[n.Name] = n
+ return
+ }
+ //Updating Pod
+ sendPodWork(KubeWorkUpdatePod, queue, newObj)
+ }
+ }
+ },
+ },
+ )
+ stop := make(chan struct{})
+ go podController.Run(stop)
+}
+
+func runBookKeeper(workQue <-chan KubeWork, consulQueue chan<- ConsulWork, apiClient *kclient.Client) {
+
+ client := newClientBookKeeper(apiClient)
+ client.consulQueue = consulQueue
+
+ for work := range workQue {
+ switch work.Action {
+ case KubeWorkAddNode:
+ client.AddNode(work.Node)
+ case KubeWorkRemoveNode:
+ client.RemoveNode(work.Node.Name)
+ case KubeWorkUpdateNode:
+ client.UpdateNode(work.Node)
+ case KubeWorkAddService:
+ client.AddService(work.Service)
+ case KubeWorkRemoveService:
+ client.RemoveService(work.Service)
+ case KubeWorkUpdateService:
+ client.UpdateService(work.Service)
+ case KubeWorkAddPod:
+ client.AddPod(work.Pod)
+ case KubeWorkRemovePod:
+ client.RemovePod(work.Pod)
+ case KubeWorkUpdatePod:
+ client.UpdatePod(work.Pod)
+ case KubeWorkSync:
+ client.Sync()
+ default:
+ log.Println("Unsupported work action: ", work.Action)
+ }
+ }
+ log.Println("Completed all work")
+}
+
+func runConsulWorker(queue <-chan ConsulWork, client *consulapi.Client) {
+ worker := newConsulAgentWorker(client)
+
+ for work := range queue {
+ log.Println("Consul Work Action: ", work.Action, " BaseID:", work.Config.BaseID)
+
+ switch work.Action {
+ case ConsulWorkAddService:
+ worker.AddService(work.Config, work.Service)
+ case ConsulWorkRemoveService:
+ worker.RemoveService(work.Config)
+ case ConsulWorkAddPod:
+ worker.AddPod(work.Config, work.Pod)
+ case ConsulWorkRemovePod:
+ worker.RemovePod(work.Config)
+ case ConsulWorkSyncDNS:
+ worker.SyncDNS()
+ default:
+ log.Println("Unsupported Action of: ", work.Action)
+ }
+
+ }
+}
+
+func main() {
+ flag.Parse()
+ // TODO: Validate input flags.
+ var err error
+ var consulClient *consulapi.Client
+
+ if consulClient, err = newConsulClient(*argConsulAgent); err != nil {
+ log.Fatalf("Failed to create Consul client - %v", err)
+ }
+
+ kubeClient, err := newKubeClient()
+ if err != nil {
+ log.Fatalf("Failed to create a kubernetes client: %v", err)
+ }
+
+ if _, err := kubeClient.ServerVersion(); err != nil {
+ log.Fatal("Could not connect to Kube Master", err)
+ } else {
+ log.Println("Connected to K8S API Server")
+ }
+
+ kubeWorkQueue := make(chan KubeWork)
+ consulWorkQueue := make(chan ConsulWork)
+ go runBookKeeper(kubeWorkQueue, consulWorkQueue, kubeClient)
+ watchForNodes(kubeClient, kubeWorkQueue)
+ watchForServices(kubeClient, kubeWorkQueue)
+ watchForPods(kubeClient, kubeWorkQueue)
+ go runConsulWorker(consulWorkQueue, consulClient)
+
+ log.Println("Running Consul Sync loop every: 60 seconds")
+ csyncer := time.NewTicker(time.Second * time.Duration(60))
+ go func() {
+ for t := range csyncer.C {
+ log.Println("Consul Sync request at:", t)
+ consulWorkQueue <- ConsulWork{
+ Action: ConsulWorkSyncDNS,
+ }
+ }
+ }()
+
+ log.Println("Running Kube Sync loop every: 60 seconds")
+ ksyncer := time.NewTicker(time.Second * time.Duration(60))
+ go func() {
+ for t := range ksyncer.C {
+ log.Println("Kube Sync request at:", t)
+ kubeWorkQueue <- KubeWork{
+ Action: KubeWorkSync,
+ }
+ }
+ }()
+
+ // Prevent exit
+ select {}
+}
diff --git a/kube2consul/src/kube2consul/kube_service.go b/kube2consul/src/kube2consul/kube_service.go
new file mode 100644
index 0000000..887f6c8
--- /dev/null
+++ b/kube2consul/src/kube2consul/kube_service.go
@@ -0,0 +1,100 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// kube_service.go
+package main
+
+import (
+ "log"
+ "strconv"
+ "strings"
+
+ consulapi "github.com/hashicorp/consul/api"
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeServiceAction interface {
+ BuildServiceInfoMap(*kapi.ServicePort, *map[string]string)
+ BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceRegistration
+ BuildAgentServiceCheck(DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceCheck
+}
+
+type KubeService struct {
+ KubeServiceAction
+ sInfos map[string]*ServiceInfo
+}
+
+func newKubeService() *KubeService {
+ return &KubeService{
+ sInfos: make(map[string]*ServiceInfo),
+ }
+}
+
+func (kube *KubeService) BuildServiceInfoMap(sport *kapi.ServicePort, annos *map[string]string) {
+ portstr := strconv.Itoa(int(sport.Port))
+ nameprefix := "SERVICE_" + portstr + "_NAME"
+ tagprefix := "SERVICE_" + portstr + "_TAGS"
+
+ for name, value := range *annos {
+ if strings.HasPrefix(name, nameprefix) {
+ addToServiceInfoMap(name, value, Service_NAME, &kube.sInfos)
+ } else if strings.HasPrefix(name, tagprefix) {
+ addToServiceInfoMap(name, value, Service_TAGS, &kube.sInfos)
+ }
+ }
+}
+
+func (kube *KubeService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceRegistration {
+ name := sInfo.Name
+ var tagslice []string
+ tagslice = strings.Split(sInfo.Tags, ",")
+
+ for _, elem := range tagslice {
+ var elemslice []string
+ elemslice = strings.Split(elem, ":")
+ if elemslice[0] == PROTOCOL {
+ switch elemslice[1] {
+ case PROTOCOL_UI:
+ name = PREFIX_UI + name
+ default:
+ log.Println("regular protocol:", elemslice[1])
+ }
+ break
+ }
+ }
+
+ asrID := config.BaseID + "-" + strconv.Itoa(int(sport.Port)) + "-" + name
+
+ regPort := sport.NodePort
+ if config.ServiceType == kapi.ServiceTypeClusterIP {
+ regPort = sport.Port
+ }
+
+ return &consulapi.AgentServiceRegistration{
+ ID: asrID,
+ Name: name,
+ Address: config.IPAddress,
+ Port: int(regPort),
+ Tags: tagslice,
+ }
+}
+
+func (kube *KubeService) BuildAgentServiceCheck(config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceCheck {
+ log.Println("Creating service check for: ", config.IPAddress, " on Port: ", sport.NodePort)
+ return &consulapi.AgentServiceCheck{
+ TCP: config.IPAddress + ":" + strconv.Itoa(int(sport.NodePort)),
+ Interval: "60s",
+ }
+}
diff --git a/kube2consul/src/kube2consul/kube_work.go b/kube2consul/src/kube2consul/kube_work.go
new file mode 100644
index 0000000..4695a76
--- /dev/null
+++ b/kube2consul/src/kube2consul/kube_work.go
@@ -0,0 +1,511 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// kube_work.go
+package main
+
+import (
+ "encoding/json"
+ "kube2consul/util/restclient"
+ "log"
+ "strings"
+ "sync"
+ "time"
+
+ kapi "k8s.io/kubernetes/pkg/api"
+ kclient "k8s.io/kubernetes/pkg/client/unversioned"
+ kselector "k8s.io/kubernetes/pkg/fields"
+ klabels "k8s.io/kubernetes/pkg/labels"
+)
+
+type KubeBookKeeper interface {
+ AddNode(*kapi.Node)
+ RemoveNode(*kapi.Node)
+ UpdateNode(*kapi.Node)
+ AddService(*kapi.Service)
+ RemoveService(*kapi.Service)
+ UpdateService(*kapi.Service)
+ AddPod(*kapi.Pod)
+ RemovePod(*kapi.Pod)
+ UpdatePod(*kapi.Pod)
+}
+
+type ClientBookKeeper struct {
+ sync.Mutex
+ KubeBookKeeper
+ client *kclient.Client
+ nodes map[string]*KubeNode
+ services map[string]*kapi.Service
+ pods map[string]*kapi.Pod
+ consulQueue chan<- ConsulWork
+}
+
+type KubeNode struct {
+ name string
+ readyStatus bool
+ serviceIDS map[string]string
+ address string
+}
+
+func buildServiceBaseID(nodeName string, service *kapi.Service) string {
+ return nodeName + "-" + service.Name
+}
+
+func nodeReady(node *kapi.Node) bool {
+ for i := range node.Status.Conditions {
+ if node.Status.Conditions[i].Type == kapi.NodeReady {
+ return node.Status.Conditions[i].Status == kapi.ConditionTrue
+ }
+ }
+
+ log.Println("NodeReady condition is missing from node: ", node.Name)
+ return false
+}
+
+func newKubeNode() *KubeNode {
+ return &KubeNode{
+ name: "",
+ readyStatus: false,
+ serviceIDS: make(map[string]string),
+ }
+}
+
+func newClientBookKeeper(client *kclient.Client) *ClientBookKeeper {
+ return &ClientBookKeeper{
+ client: client,
+ nodes: make(map[string]*KubeNode),
+ services: make(map[string]*kapi.Service),
+ pods: make(map[string]*kapi.Pod),
+ }
+}
+
+func (client *ClientBookKeeper) AddNode(node *kapi.Node) {
+ client.Lock()
+ defer client.Unlock()
+ if _, ok := client.nodes[node.Name]; ok {
+ log.Printf("Node:%s already exist. skip this ADD notification.", node.Name)
+ return
+ }
+ //Add to Node Collection
+ kubeNode := newKubeNode()
+ kubeNode.readyStatus = nodeReady(node)
+ kubeNode.name = node.Name
+ kubeNode.address = node.Status.Addresses[0].Address
+
+ //Send request for Service Addition for node and all serviceIDS (Create Service ID here)
+ if kubeNode.readyStatus {
+ client.AddAllServicesToNode(kubeNode)
+ }
+
+ client.nodes[node.Name] = kubeNode
+ log.Println("Added Node: ", node.Name)
+}
+
+func (client *ClientBookKeeper) AddAllServicesToNode(node *KubeNode) {
+ for _, service := range client.services {
+ if service.Spec.Type == kapi.ServiceTypeClusterIP {
+ log.Println("ClusterIP service ignored for node binding:", service.Name)
+ } else {
+ client.AttachServiceToNode(node, service)
+ }
+ }
+}
+
+func (client *ClientBookKeeper) AttachServiceToNode(node *KubeNode, service *kapi.Service) {
+ baseID := buildServiceBaseID(node.name, service)
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkAddService,
+ Service: service,
+ Config: DnsInfo{
+ BaseID: baseID,
+ IPAddress: node.address,
+ ServiceType: service.Spec.Type,
+ },
+ }
+ log.Println("Requesting Addition of services with Base ID: ", baseID)
+ node.serviceIDS[service.Name] = baseID
+}
+
+func (client *ClientBookKeeper) RemoveNode(name string) {
+ client.Lock()
+ defer client.Unlock()
+ if kubeNode, ok := client.nodes[name]; ok {
+ //Remove All DNS for node
+ client.RemoveAllServicesFromNode(kubeNode)
+ //Remove Node from Collection
+ delete(client.nodes, name)
+ } else {
+ log.Println("Attmepted to remove missing node: ", name)
+ }
+
+ log.Println("Queued Node to be removed: ", name)
+}
+
+func (client *ClientBookKeeper) RemoveAllServicesFromNode(node *KubeNode) {
+ for _, service := range client.services {
+ if service.Spec.Type == kapi.ServiceTypeClusterIP {
+ log.Println("ClusterIP service ignored for node unbinding:", service.Name)
+ } else {
+ client.DetachServiceFromNode(node, service)
+ }
+ }
+}
+
+func (client *ClientBookKeeper) DetachServiceFromNode(node *KubeNode, service *kapi.Service) {
+ if baseID, ok := node.serviceIDS[service.Name]; ok {
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkRemoveService,
+ Config: DnsInfo{
+ BaseID: baseID,
+ },
+ }
+
+ log.Println("Requesting Removal of services with Base ID: ", baseID)
+ delete(node.serviceIDS, service.Name)
+ }
+}
+
+func (client *ClientBookKeeper) UpdateNode(node *kapi.Node) {
+ if nodeReady(node) {
+ // notReady nodes also stored in client.nodes
+ client.AddAllServicesToNode(client.nodes[node.Name])
+ } else {
+ client.RemoveAllServicesFromNode(client.nodes[node.Name])
+ }
+}
+
+func (client *ClientBookKeeper) AddService(svc *kapi.Service) {
+ client.Lock()
+ defer client.Unlock()
+ if !isUserDefinedService(svc) {
+ log.Println("Not the target, skip this ADD notification for service:", svc.Name)
+ return
+ }
+
+ if _, ok := client.services[svc.Name]; ok {
+ log.Printf("service:%s already exist. skip this ADD notification.", svc.Name)
+ return
+ }
+
+ if kapi.IsServiceIPSet(svc) {
+ if svc.Spec.Type == kapi.ServiceTypeClusterIP {
+ log.Println("Adding ClusterIP service:", svc.Name)
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkAddService,
+ Service: svc,
+ Config: DnsInfo{
+ BaseID: svc.Name,
+ IPAddress: svc.Spec.ClusterIP,
+ ServiceType: svc.Spec.Type,
+ },
+ }
+ } else {
+ //Check and use local node list first
+ if len(client.nodes) > 0 {
+ for _, kubeNode := range client.nodes {
+ if kubeNode.readyStatus {
+ client.AttachServiceToNode(kubeNode, svc)
+ }
+ }
+ } else {
+ log.Println("local node list is empty, retrieve it from the api server.")
+ nodes := client.client.Nodes()
+ if nodeList, err := nodes.List(kapi.ListOptions{
+ LabelSelector: klabels.Everything(),
+ FieldSelector: kselector.Everything(),
+ }); err == nil {
+ for _, node := range nodeList.Items {
+ if nodeReady(&node) {
+ kubeNode := newKubeNode()
+ kubeNode.readyStatus = nodeReady(&node)
+ kubeNode.name = node.Name
+ kubeNode.address = node.Status.Addresses[0].Address
+ client.AttachServiceToNode(kubeNode, svc)
+ }
+ }
+ }
+ }
+ }
+ client.services[svc.Name] = svc
+ log.Println("Queued Service to be added: ", svc.Name)
+ } else {
+ // if ClusterIP is not set, do not create a DNS records
+ log.Printf("Skipping dns record for headless service: %s\n", svc.Name)
+ }
+}
+
+func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) {
+ client.Lock()
+ defer client.Unlock()
+ if !isUserDefinedService(svc) {
+ log.Println("Not the target, skip this Remove notification for service:", svc.Name)
+ return
+ }
+
+ if _, ok := client.services[svc.Name]; !ok {
+ log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name)
+ return
+ }
+
+ if svc.Spec.Type == kapi.ServiceTypeClusterIP {
+ log.Println("Removing ClusterIP service:", svc.Name)
+ //Perform All DNS Removes
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkRemoveService,
+ Config: DnsInfo{
+ BaseID: svc.Name,
+ },
+ }
+ } else {
+ //Check and use local node list first
+ if len(client.nodes) > 0 {
+ for _, kubeNode := range client.nodes {
+ if kubeNode.readyStatus {
+ client.DetachServiceFromNode(kubeNode, svc)
+ }
+ }
+ } else {
+ log.Println("local node list is empty, retrieve it from the api server. sync it later.")
+ }
+ }
+ delete(client.services, svc.Name)
+ log.Println("Queued Service to be removed: ", svc.Name)
+}
+
+func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) {
+ if !isUserDefinedService(svc) {
+ log.Println("Not the target, skip this Update notification for service:", svc.Name)
+ return
+ }
+
+ client.RemoveService(svc)
+ client.AddService(svc)
+}
+
+func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) {
+ client.Lock()
+ defer client.Unlock()
+ if !isUserDefinedPod(pod) {
+ log.Println("Not the target, skip this ADD notification for pod:", pod.Name)
+ return
+ }
+
+ if _, ok := client.pods[pod.Name]; ok {
+ log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name)
+ return
+ }
+
+ //newly added Pod
+ if pod.Name == "" || pod.Status.PodIP == "" {
+ log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name)
+ addMap[pod.Name] = pod
+ return
+ }
+ //existing Pod
+ if kapi.IsPodReady(pod) {
+ if *argPdmControllerUrl != "" {
+ fillPdmPodIPsMap(pod)
+ }
+ }
+ //Perform All DNS Adds
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkAddPod,
+ Pod: pod,
+ Config: DnsInfo{
+ BaseID: pod.Name,
+ IPAddress: pod.Status.PodIP,
+ },
+ }
+ client.pods[pod.Name] = pod
+ log.Println("Queued Pod to be added: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
+ client.Lock()
+ defer client.Unlock()
+ if !isUserDefinedPod(pod) {
+ log.Println("Not the target, skip this Remove notification for pod:", pod.Name)
+ return
+ }
+
+ if _, ok := client.pods[pod.Name]; !ok {
+ log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name)
+ return
+ }
+ //Perform All DNS Removes
+ client.consulQueue <- ConsulWork{
+ Action: ConsulWorkRemovePod,
+ Config: DnsInfo{
+ BaseID: pod.Name,
+ },
+ }
+ delete(client.pods, pod.Name)
+ log.Println("Queued Pod to be removed: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) {
+ if !isUserDefinedPod(pod) {
+ log.Println("Not the target, skip this Update notification for pod:", pod.Name)
+ return
+ }
+
+ if *argPdmControllerUrl != "" {
+ fillPdmPodIPsMap(pod)
+ }
+ client.RemovePod(pod)
+ client.AddPod(pod)
+}
+
+func (client *ClientBookKeeper) Sync() {
+ nodes := client.client.Nodes()
+ if nodeList, err := nodes.List(kapi.ListOptions{
+ LabelSelector: klabels.Everything(),
+ FieldSelector: kselector.Everything(),
+ }); err == nil {
+ for name, _ := range client.nodes {
+ if !containsNodeName(name, nodeList) {
+ log.Printf("Bookkeeper has node: %s that does not exist in api server", name)
+ client.RemoveNode(name)
+ }
+ }
+ }
+
+ svcs := client.client.Services(kapi.NamespaceAll)
+ if svcList, err := svcs.List(kapi.ListOptions{
+ LabelSelector: klabels.Everything(),
+ FieldSelector: kselector.Everything(),
+ }); err == nil {
+ for name, svc := range client.services {
+ if !containsSvcName(name, svcList) {
+ log.Printf("Bookkeeper has service: %s that does not exist in api server", name)
+ client.RemoveService(svc)
+ }
+ }
+ }
+
+ pods := client.client.Pods(kapi.NamespaceAll)
+ if podList, err := pods.List(kapi.ListOptions{
+ LabelSelector: klabels.Everything(),
+ FieldSelector: kselector.Everything(),
+ }); err == nil {
+ for name, pod := range client.pods {
+ if !containsPodName(name, podList) {
+ log.Printf("Bookkeeper has pod: %s that does not exist in api server", name)
+ if _, ok := deleteMap[pod.Name]; ok {
+ log.Println("Missing remove notification for Pod: ", pod.Name)
+ delete(deleteMap, pod.Name)
+ }
+ client.RemovePod(pod)
+ }
+ }
+ //It was observed that the kube notification may lost after the restarting of kube master.
+ //Currently this is only done for Pod. same for Node and Service will be done when required.
+ for _, pod := range podList.Items {
+ if _, ok := client.pods[pod.ObjectMeta.Name]; !ok {
+ if kapi.IsPodReady(&pod) {
+ log.Println("Missing add/update notification for Pod: ", pod.ObjectMeta.Name)
+ client.AddPod(&pod)
+ }
+ }
+ }
+ }
+}
+
+func containsPodName(name string, pods *kapi.PodList) bool {
+ for _, pod := range pods.Items {
+ if pod.ObjectMeta.Name == name {
+ return true
+ }
+ }
+ return false
+}
+
+func containsSvcName(name string, svcs *kapi.ServiceList) bool {
+ for _, svc := range svcs.Items {
+ if svc.ObjectMeta.Name == name {
+ return true
+ }
+ }
+ return false
+}
+
+func containsNodeName(name string, nodes *kapi.NodeList) bool {
+ for _, node := range nodes.Items {
+ if node.ObjectMeta.Name == name {
+ return true
+ }
+ }
+ return false
+}
+
+func isUserDefinedPod(pod *kapi.Pod) bool {
+ for _, c := range pod.Spec.Containers {
+ for _, e := range c.Env {
+ if strings.HasPrefix(e.Name, "SERVICE_") {
+ if strings.Contains(e.Name, "_NAME") {
+ return true
+ }
+ }
+ }
+ }
+ return false
+}
+
+func isUserDefinedService(svc *kapi.Service) bool {
+ for name, _ := range svc.ObjectMeta.Annotations {
+ if strings.HasPrefix(name, "SERVICE_") {
+ if strings.Contains(name, "_NAME") {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func fillPdmPodIPsMap(pod *kapi.Pod) {
+ base := *argPdmControllerUrl
+ resUrl := podUrl + "/" + pod.Namespace + "/pods"
+ rclient := restclient.NewRESTClient(base, resUrl, pod.Name)
+ //try 10 times at maximum
+ for i := 0; i < 10; i++ {
+ log.Printf("try REST get the PDM PodIP for pod:%s at %d time..", pod.Name, i)
+ buf, err := rclient.Get()
+ if err != nil {
+ log.Printf("request PDM PodIP info for Pod:%s with error:%v", pod.Name, err)
+ time.Sleep(6 * 1e9) //sleep for 6 secs
+ continue
+ }
+ podIPInfo := PdmPodIPInfo{}
+ json.Unmarshal(buf, &podIPInfo)
+ IPInfo := podIPInfo.Pod
+ IPs := IPInfo.IPs
+ if len(IPs) == 0 {
+ log.Printf("request PDM PodIP info for Pod:%s return empty ip list.", pod.Name)
+ return
+ } else {
+ for _, ip := range IPs {
+ if ip.IPAddress == "" {
+ log.Printf("request PDM PodIP info for Pod:%s return empty ip.", pod.Name)
+ return
+ }
+ }
+ pdmPodIPsMap[pod.Name] = IPs
+ }
+ log.Println("successfully REST get the PDM PodIP for pod:", pod.Name)
+ return
+ }
+
+ log.Println("failed to REST get the PDM PodIP for pod:", pod.Name)
+}
diff --git a/kube2consul/src/kube2consul/pod_service.go b/kube2consul/src/kube2consul/pod_service.go
new file mode 100644
index 0000000..c6294cb
--- /dev/null
+++ b/kube2consul/src/kube2consul/pod_service.go
@@ -0,0 +1,677 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// pod_service.go
+package main
+
+import (
+ "fmt"
+ "log"
+ "strconv"
+ "strings"
+
+ consulapi "github.com/hashicorp/consul/api"
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type PodServiceAction interface {
+ BuildServiceInfoMap(*kapi.ContainerPort, *kapi.Container)
+ BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ContainerPort, string) *consulapi.AgentServiceRegistration
+}
+
+type PodService struct {
+ PodServiceAction
+ sInfos map[string]*ServiceInfo
+}
+
+func newPodService() *PodService {
+ return &PodService{
+ sInfos: make(map[string]*ServiceInfo),
+ }
+}
+
+func (pod *PodService) BuildServiceInfoMap(cport *kapi.ContainerPort, container *kapi.Container) {
+ portstr := strconv.Itoa(int(cport.ContainerPort))
+ fullname := "SERVICE_" + portstr + "_NAME"
+ nameprefix := "SERVICE_" + portstr + "_NAME_"
+ tagprefix := "SERVICE_" + portstr + "_TAGS"
+ labelprefix := "SERVICE_" + portstr + "_ROUTE_LABELS"
+ mdataprefix := "SERVICE_" + portstr + "_META_DATA"
+ nsprefix := "SERVICE_" + portstr + "_NAMESPACE"
+ ttlprefix := "SERVICE_" + portstr + "_CHECK_TTL"
+ httpprefix := "SERVICE_" + portstr + "_CHECK_HTTP"
+ tcpprefix := "SERVICE_" + portstr + "_CHECK_TCP"
+ cmdprefix := "SERVICE_" + portstr + "_CHECK_CMD"
+ scriptprefix := "SERVICE_" + portstr + "_CHECK_SCRIPT"
+ timeoutprefix := "SERVICE_" + portstr + "_CHECK_TIMEOUT"
+ intervalprefix := "SERVICE_" + portstr + "_CHECK_INTERVAL"
+ for _, env := range container.Env {
+ if env.Name == fullname || strings.HasPrefix(env.Name, nameprefix) {
+ addToServiceInfoMap(env.Name, env.Value, Service_NAME, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, tagprefix) {
+ addToServiceInfoMap(env.Name, env.Value, Service_TAGS, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, labelprefix) {
+ addToServiceInfoMap(env.Name, env.Value, Service_LABELS, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, mdataprefix) {
+ addToServiceInfoMap(env.Name, env.Value, Service_MDATA, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, nsprefix) {
+ addToServiceInfoMap(env.Name, env.Value, Service_NS, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, ttlprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_TTL, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, httpprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_HTTP, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, tcpprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_TCP, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, cmdprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_CMD, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, scriptprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_SCRIPT, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, timeoutprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_TIMEOUT, &pod.sInfos)
+ } else if strings.HasPrefix(env.Name, intervalprefix) && env.Value != "" {
+ addToServiceInfoMap(env.Name, env.Value, Service_INTERVAL, &pod.sInfos)
+ }
+ }
+}
+
+func (pod *PodService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, cport *kapi.ContainerPort, cId string) []*consulapi.AgentServiceRegistration {
+ asrs := []*consulapi.AgentServiceRegistration{}
+ checks := []*consulapi.AgentServiceCheck{}
+
+ //build Checks object and populate checks table
+ var checks_in_tag string
+
+ if len(sInfo.TTL) > 0 {
+ check := new(consulapi.AgentServiceCheck)
+ check.TTL = sInfo.TTL
+ kv := "ttl:" + sInfo.TTL + ","
+ checks_in_tag += kv
+ checks = append(checks, check)
+ }
+
+ if len(sInfo.HTTP) > 0 {
+ check := new(consulapi.AgentServiceCheck)
+ check.HTTP = fmt.Sprintf("http://%s:%d%s", config.IPAddress, cport.ContainerPort, sInfo.HTTP)
+ kv := "http:" + sInfo.HTTP + ","
+ checks_in_tag += kv
+ if len(sInfo.Interval) == 0 {
+ check.Interval = DefaultInterval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + DefaultInterval + ","
+ checks_in_tag += kv
+ }
+ } else {
+ check.Interval = sInfo.Interval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + sInfo.Interval + ","
+ checks_in_tag += kv
+ }
+ }
+ if len(sInfo.Timeout) > 0 {
+ check.Timeout = sInfo.Timeout
+ if strings.Contains(checks_in_tag, "timeout:") {
+ //do nothing
+ } else {
+ kv = "timeout:" + sInfo.Timeout + ","
+ checks_in_tag += kv
+ }
+ }
+ checks = append(checks, check)
+ }
+
+ if len(sInfo.TCP) > 0 {
+ check := new(consulapi.AgentServiceCheck)
+ check.TCP = fmt.Sprintf("%s:%d", config.IPAddress, cport.ContainerPort)
+ kv := "tcp:ok,"
+ checks_in_tag += kv
+ if len(sInfo.Interval) == 0 {
+ check.Interval = DefaultInterval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + DefaultInterval + ","
+ checks_in_tag += kv
+ }
+ } else {
+ check.Interval = sInfo.Interval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + sInfo.Interval + ","
+ checks_in_tag += kv
+ }
+ }
+ if len(sInfo.Timeout) > 0 {
+ check.Timeout = sInfo.Timeout
+ if strings.Contains(checks_in_tag, "timeout:") {
+ //do nothing
+ } else {
+ kv = "timeout:" + sInfo.Timeout + ","
+ checks_in_tag += kv
+ }
+ }
+ checks = append(checks, check)
+ }
+
+ if len(sInfo.CMD) > 0 {
+ check := new(consulapi.AgentServiceCheck)
+ check.Script = fmt.Sprintf("check-cmd %s %s %s", cId, strconv.Itoa(int(cport.ContainerPort)), sInfo.CMD)
+ kv := "script:" + sInfo.CMD + ","
+ checks_in_tag += kv
+ if len(sInfo.Interval) == 0 {
+ check.Interval = DefaultInterval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + DefaultInterval + ","
+ checks_in_tag += kv
+ }
+ } else {
+ check.Interval = sInfo.Interval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + sInfo.Interval + ","
+ checks_in_tag += kv
+ }
+ }
+ checks = append(checks, check)
+ }
+
+ if len(sInfo.Script) > 0 {
+ check := new(consulapi.AgentServiceCheck)
+ withIp := strings.Replace(sInfo.Script, "$SERVICE_IP", config.IPAddress, -1)
+ check.Script = strings.Replace(withIp, "$SERVICE_PORT", strconv.Itoa(int(cport.ContainerPort)), -1)
+ kv := "script:" + sInfo.Script + ","
+ checks_in_tag += kv
+ if len(sInfo.Interval) == 0 {
+ check.Interval = DefaultInterval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + DefaultInterval + ","
+ checks_in_tag += kv
+ }
+ } else {
+ check.Interval = sInfo.Interval
+ if strings.Contains(checks_in_tag, "interval:") {
+ //do nothing
+ } else {
+ kv = "interval:" + sInfo.Interval + ","
+ checks_in_tag += kv
+ }
+ }
+ if len(sInfo.Timeout) > 0 {
+ check.Timeout = sInfo.Timeout
+ if strings.Contains(checks_in_tag, "timeout:") {
+ //do nothing
+ } else {
+ kv = "timeout:" + sInfo.Timeout + ","
+ checks_in_tag += kv
+ }
+ }
+ checks = append(checks, check)
+ }
+
+ //remove trailing ","
+ if len(checks_in_tag) != 0 {
+ checks_in_tag = strings.TrimRight(checks_in_tag, ",")
+ }
+
+ //build other talbes in tags
+ var (
+ base_in_tag string
+ lb_in_tag string
+ labels_in_tag = sInfo.Labels
+ metadata_in_tag = sInfo.MData
+ ns_in_tag = sInfo.NS
+ )
+
+ ts := strings.Split(sInfo.Tags, ",")
+ for _, elem := range ts {
+ if strings.Contains(elem, NETWORK_PLANE_TYPE) || strings.Contains(elem, VISUAL_RANGE) {
+ kv := "," + elem
+ labels_in_tag += kv
+ continue
+ }
+
+ if strings.Contains(elem, LB_POLICY) || strings.Contains(elem, LB_SVR_PARAMS) {
+ kv := "," + elem
+ lb_in_tag += kv
+ continue
+ }
+
+ kv := "," + elem
+ base_in_tag += kv
+ }
+
+ //remove leading ","
+ if len(base_in_tag) != 0 {
+ base_in_tag = strings.TrimLeft(base_in_tag, ",")
+ }
+
+ if len(lb_in_tag) != 0 {
+ lb_in_tag = strings.TrimLeft(lb_in_tag, ",")
+ }
+
+ if len(labels_in_tag) != 0 {
+ labels_in_tag = strings.TrimLeft(labels_in_tag, ",")
+ }
+
+ //build tables in tag
+ var tagslice []string
+ if len(base_in_tag) != 0 {
+ tagslice = append(tagslice, buildTableInJsonFormat(Table_BASE, base_in_tag))
+ }
+
+ if len(lb_in_tag) != 0 {
+ tagslice = append(tagslice, buildTableInJsonFormat(Table_LB, lb_in_tag))
+ }
+
+ if len(labels_in_tag) == 0 {
+ tagslice = append(tagslice, DefaultLabels)
+ } else {
+ if !strings.Contains(labels_in_tag, "visualRange") {
+ labels_in_tag += ",visualRange:1"
+ }
+ tagslice = append(tagslice, buildTableInJsonFormat(Table_LABELS, labels_in_tag))
+ }
+
+ if len(metadata_in_tag) != 0 {
+ tagslice = append(tagslice, buildTableInJsonFormat(Table_META_DATA, metadata_in_tag))
+ }
+
+ if len(ns_in_tag) != 0 {
+ tagslice = append(tagslice, "\"ns\":{\"namespace\":\"" + ns_in_tag + "\"}")
+ }
+
+ if len(checks_in_tag) != 0 {
+ tagslice = append(tagslice, buildTableInJsonFormat(Table_CHECKS, checks_in_tag))
+ }
+
+ //append "-<namespace>"
+ name := sInfo.Name
+ if len(ns_in_tag) != 0 {
+ name = name + "-" + ns_in_tag
+ }
+
+ //handle IUI service
+// for _, elem := range ts {
+// var elemslice []string
+// elemslice = strings.Split(elem, ":")
+// if elemslice[0] == PROTOCOL {
+// switch elemslice[1] {
+// case PROTOCOL_UI:
+// name = PREFIX_UI + name
+// default:
+// log.Println("regular protocol:", elemslice[1])
+// }
+// break
+// }
+// }
+
+ if len(sInfo.IPs) == 0 {
+ serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name
+ asr := &consulapi.AgentServiceRegistration{
+ ID: serviceID,
+ Name: name,
+ Address: config.IPAddress,
+ Port: int(cport.ContainerPort),
+ Tags: tagslice,
+ Checks: checks,
+ }
+ asrs = append(asrs, asr)
+ return asrs
+ } else {
+ for _, ip := range sInfo.IPs {
+ serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name + "-" + ip.NetworkPlane
+ addr := ip.IPAddress
+ tagslice_with_single_network_plane_type := refillTagsliceWithSingleNetworkPlaneType(tagslice, labels_in_tag, ip.NetworkPlane)
+ asr := &consulapi.AgentServiceRegistration{
+ ID: serviceID,
+ Name: name,
+ Address: addr,
+ Port: int(cport.ContainerPort),
+ Tags: tagslice_with_single_network_plane_type,
+ Checks: checks,
+ }
+ asrs = append(asrs, asr)
+ }
+ return asrs
+ }
+}
+
+func addToServiceInfoMap(name, value, flag string, smap *map[string]*ServiceInfo) {
+ segs := strings.Split(name, "_")
+ switch flag {
+ case Service_NAME:
+ if strings.ContainsAny(value, ".") {
+ log.Printf("Warning:service name %s contains dot", value)
+ value = strings.Replace(value, ".", "-", -1)
+ log.Printf("Warning:service name has been modified to %s", value)
+ }
+ if len(segs) == 3 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Name = value
+ } else {
+ sInfo := &ServiceInfo{
+ Name: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 4 {
+ id := segs[3]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Name = value
+ } else {
+ sInfo := &ServiceInfo{
+ Name: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_TAGS:
+ if len(segs) == 3 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Tags = value
+ } else {
+ sInfo := &ServiceInfo{
+ Tags: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 4 {
+ id := segs[3]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Tags = value
+ } else {
+ sInfo := &ServiceInfo{
+ Tags: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_LABELS:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Labels = value
+ } else {
+ sInfo := &ServiceInfo{
+ Labels: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Labels = value
+ } else {
+ sInfo := &ServiceInfo{
+ Labels: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_MDATA:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].MData = value
+ } else {
+ sInfo := &ServiceInfo{
+ MData: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].MData = value
+ } else {
+ sInfo := &ServiceInfo{
+ MData: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_NS:
+ if len(segs) == 3 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].NS = value
+ } else {
+ sInfo := &ServiceInfo{
+ NS: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 4 {
+ id := segs[3]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].NS = value
+ } else {
+ sInfo := &ServiceInfo{
+ NS: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_TTL:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].TTL = value
+ } else {
+ sInfo := &ServiceInfo{
+ TTL: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].TTL = value
+ } else {
+ sInfo := &ServiceInfo{
+ TTL: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_HTTP:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].HTTP = value
+ } else {
+ sInfo := &ServiceInfo{
+ HTTP: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].HTTP = value
+ } else {
+ sInfo := &ServiceInfo{
+ HTTP: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_TCP:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].TCP = value
+ } else {
+ sInfo := &ServiceInfo{
+ TCP: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].TCP = value
+ } else {
+ sInfo := &ServiceInfo{
+ TCP: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_CMD:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].CMD = value
+ } else {
+ sInfo := &ServiceInfo{
+ CMD: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].CMD = value
+ } else {
+ sInfo := &ServiceInfo{
+ CMD: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_SCRIPT:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Script = value
+ } else {
+ sInfo := &ServiceInfo{
+ Script: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Script = value
+ } else {
+ sInfo := &ServiceInfo{
+ Script: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_TIMEOUT:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Timeout = value
+ } else {
+ sInfo := &ServiceInfo{
+ Timeout: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Timeout = value
+ } else {
+ sInfo := &ServiceInfo{
+ Timeout: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ case Service_INTERVAL:
+ if len(segs) == 4 {
+ if _, ok := (*smap)["0"]; ok {
+ (*smap)["0"].Interval = value
+ } else {
+ sInfo := &ServiceInfo{
+ Interval: value,
+ }
+ (*smap)["0"] = sInfo
+ }
+ } else if len(segs) == 5 {
+ id := segs[4]
+ if _, ok := (*smap)[id]; ok {
+ (*smap)[id].Interval = value
+ } else {
+ sInfo := &ServiceInfo{
+ Interval: value,
+ }
+ (*smap)[id] = sInfo
+ }
+ }
+ default:
+ log.Println("Unsupported Service Attribute: ", flag)
+ }
+}
+
+func buildTableInJsonFormat(table, input string) string {
+ head := "\"" + table + "\":{"
+ tail := "}"
+ body := ""
+
+ s := strings.Split(input, ",")
+ slen := len(s)
+ for _, v := range s {
+ slen--
+ s1 := strings.Split(v, ":")
+ mstr := "\"" + strings.TrimSpace(s1[0]) + "\":"
+ if strings.Contains(body, mstr) {
+ if slen == 0 {
+ body = strings.TrimRight(body, ",")
+ }
+ continue
+ }
+ if len(s1) == 2 {
+ kv := "\"" + strings.TrimSpace(s1[0]) + "\":\"" + strings.TrimSpace(s1[1]) + "\""
+ if slen != 0 {
+ kv += ","
+ }
+ body += kv
+ }
+ }
+
+ return head + body + tail
+}
+
+func refillTagsliceWithSingleNetworkPlaneType(old []string, labels, nw_type string) []string {
+ var ret_tagslice []string
+ var new_labels string
+
+ for _, elem := range old {
+ if strings.Contains(elem, NETWORK_PLANE_TYPE) {
+ continue
+ }
+ ret_tagslice = append(ret_tagslice, elem)
+ }
+
+ sl := strings.Split(labels, NETWORK_PLANE_TYPE)
+ sl1 := strings.SplitN(sl[1], ",", 2)
+ nw_type_field := NETWORK_PLANE_TYPE + ":" + nw_type
+
+ if len(sl1) == 2 {
+ new_labels = sl[0] + nw_type_field + "," + sl1[1]
+ } else {
+ new_labels = sl[0] + nw_type_field
+ }
+ ret_tagslice = append(ret_tagslice, buildTableInJsonFormat(Table_LABELS, new_labels))
+
+ return ret_tagslice
+} \ No newline at end of file
diff --git a/kube2consul/src/kube2consul/types.go b/kube2consul/src/kube2consul/types.go
new file mode 100644
index 0000000..0105d98
--- /dev/null
+++ b/kube2consul/src/kube2consul/types.go
@@ -0,0 +1,96 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// types.go
+package main
+
+import (
+ kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeWorkAction string
+
+const (
+ KubeWorkAddNode KubeWorkAction = "AddNode"
+ KubeWorkRemoveNode KubeWorkAction = "RemoveNode"
+ KubeWorkUpdateNode KubeWorkAction = "UpdateNode"
+ KubeWorkAddService KubeWorkAction = "AddService"
+ KubeWorkRemoveService KubeWorkAction = "RemoveService"
+ KubeWorkUpdateService KubeWorkAction = "UpdateService"
+ KubeWorkAddPod KubeWorkAction = "AddPod"
+ KubeWorkRemovePod KubeWorkAction = "RemovePod"
+ KubeWorkUpdatePod KubeWorkAction = "UpdatePod"
+ KubeWorkSync KubeWorkAction = "Sync"
+)
+
+type KubeWork struct {
+ Action KubeWorkAction
+ Node *kapi.Node
+ Service *kapi.Service
+ Pod *kapi.Pod
+}
+
+type ConsulWorkAction string
+
+const (
+ ConsulWorkAddService ConsulWorkAction = "AddService"
+ ConsulWorkRemoveService ConsulWorkAction = "RemoveService"
+ ConsulWorkAddPod ConsulWorkAction = "AddPod"
+ ConsulWorkRemovePod ConsulWorkAction = "RemovePod"
+ ConsulWorkSyncDNS ConsulWorkAction = "SyncDNS"
+)
+
+type ConsulWork struct {
+ Action ConsulWorkAction
+ Service *kapi.Service
+ Pod *kapi.Pod
+ Config DnsInfo
+}
+
+type DnsInfo struct {
+ BaseID string
+ IPAddress string
+ ServiceType kapi.ServiceType
+}
+
+type ServiceInfo struct {
+ Name string
+ Tags string
+ Labels string
+ MData string
+ NS string
+ TTL string
+ HTTP string
+ TCP string
+ CMD string
+ Script string
+ Timeout string
+ Interval string
+ IPs []PodIP
+}
+
+type PdmPodIPInfo struct {
+ Pod PodIPInfo `json:"pod"`
+}
+
+type PodIPInfo struct {
+ Name string `json:"name"`
+ IPs []PodIP `json:"ips"`
+}
+
+type PodIP struct {
+ NetworkPlane string `json:"network_plane_name"`
+ IPAddress string `json:"ip_address"`
+}
diff --git a/kube2consul/src/kube2consul/util/restclient/restclient.go b/kube2consul/src/kube2consul/util/restclient/restclient.go
new file mode 100644
index 0000000..e01ae0f
--- /dev/null
+++ b/kube2consul/src/kube2consul/util/restclient/restclient.go
@@ -0,0 +1,42 @@
+package restclient
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+type RESTClient struct {
+ base string
+ resource string
+ param string
+}
+
+func NewRESTClient(baseURL string, versionedAPIPath string, urlParameter string) *RESTClient {
+ return &RESTClient{
+ base: baseURL,
+ resource: versionedAPIPath,
+ param: urlParameter,
+ }
+}
+
+func (c *RESTClient) Get() (b []byte, err error) {
+ url := c.base + "/" + c.resource + "/" + c.param
+ res, err := http.Get(url)
+ if err != nil {
+ return nil, err
+ }
+
+ if res.StatusCode != 200 {
+ return nil, fmt.Errorf(res.Status)
+ }
+
+ buf, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+
+ if err != nil {
+ return nil, err
+ }
+
+ return buf, nil
+}
diff --git a/kube2consul/src/main/blueprint/deploy.yml b/kube2consul/src/main/blueprint/deploy.yml
new file mode 100644
index 0000000..cc61076
--- /dev/null
+++ b/kube2consul/src/main/blueprint/deploy.yml
@@ -0,0 +1,8 @@
+---
+- remote_user: ubuntu
+ become: yes
+ become_method: sudo
+ vars_files:
+ - vars.yml
+ tasks:
+ - include: task.yml
diff --git a/kube2consul/src/main/blueprint/list_of_servicelet.list b/kube2consul/src/main/blueprint/list_of_servicelet.list
new file mode 100644
index 0000000..77a1d4f
--- /dev/null
+++ b/kube2consul/src/main/blueprint/list_of_servicelet.list
@@ -0,0 +1,4 @@
+{
+ "servicelet_module":[
+ ]
+} \ No newline at end of file
diff --git a/kube2consul/src/main/blueprint/task.yml b/kube2consul/src/main/blueprint/task.yml
new file mode 100644
index 0000000..45cfcd9
--- /dev/null
+++ b/kube2consul/src/main/blueprint/task.yml
@@ -0,0 +1,118 @@
+---
+- name: remove kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ state: absent
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ log_driver: syslog
+ net: host
+ restart_policy: always
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ when:
+ - all_in_one == 'no'
+ - master_in_controller == 'no'
+ - cluster_type == 'k8s'
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ net: host
+ restart_policy: always
+ privileged: true
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ - "/root/.kube/config:/root/.kube/config:ro"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ CLUSTER_TYPE: "openshift"
+ when:
+ - all_in_one == 'no'
+ - master_in_controller == 'no'
+ - cluster_type == 'openshift'
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ log_driver: syslog
+ restart_policy: always
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ ALL_IN_ONE: "yes"
+ when:
+ - all_in_one == 'yes'
+ - cluster_type == 'k8s'
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ log_driver: syslog
+ restart_policy: always
+ privileged: true
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ - "/root/.kube/config:/root/.kube/config:ro"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ ALL_IN_ONE: "yes"
+ CLUSTER_TYPE: "openshift"
+ when:
+ - all_in_one == 'yes'
+ - cluster_type == 'openshift'
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ log_driver: syslog
+ restart_policy: always
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ ALL_IN_ONE: "yes"
+ when:
+ - master_in_controller == 'yes'
+ - cluster_type == 'k8s'
+
+- name: run kube2consul container
+ docker:
+ name: kube2consul
+ image: "{{kube2consul_image}}"
+ log_driver: syslog
+ restart_policy: always
+ privileged: true
+ volumes:
+ - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
+ - "/root/.kube/config:/root/.kube/config:ro"
+ env:
+ KUBE_MASTER_IP: "{{kube_master_ip}}"
+ PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
+ JOIN_IP: "{{consul_join_ip}}"
+ ALL_IN_ONE: "yes"
+ CLUSTER_TYPE: "openshift"
+ when:
+ - master_in_controller == 'yes'
+ - cluster_type == 'openshift' \ No newline at end of file
diff --git a/kube2consul/src/main/blueprint/vars.yml b/kube2consul/src/main/blueprint/vars.yml
new file mode 100644
index 0000000..971438d
--- /dev/null
+++ b/kube2consul/src/main/blueprint/vars.yml
@@ -0,0 +1,14 @@
+---
+- api_network_ip:
+- man_network_ip:
+- registry_url:
+- cp_vertype:
+- cp_type:
+- cp_name:
+- cp_version:
+- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}"
+- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}"
+- pdm_controller_ip: "{{vp_ip}}"
+- consul_join_ip: "{{zenap_msb_consul_server_ip}}"
+- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}"
+- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file
diff --git a/kube2consul/src/main/docker/Dockerfile b/kube2consul/src/main/docker/Dockerfile
new file mode 100644
index 0000000..278cac5
--- /dev/null
+++ b/kube2consul/src/main/docker/Dockerfile
@@ -0,0 +1,11 @@
+FROM alpine:3.3
+ENV CONSUL_VERSION 0.7.1
+ENV BASE /
+ADD consul-linux_amd64.tar.gz /
+RUN cd /usr/lib \
+ && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \
+ && ln -s /consul/libintl.so.8.1.3 libintl.so.8
+COPY kube2consul /bin/
+COPY start.sh /
+
+ENTRYPOINT exec /start.sh \ No newline at end of file
diff --git a/kube2consul/src/main/docker/start.sh b/kube2consul/src/main/docker/start.sh
new file mode 100644
index 0000000..033c50b
--- /dev/null
+++ b/kube2consul/src/main/docker/start.sh
@@ -0,0 +1,36 @@
+#!/bin/sh
+if [ -z "${KUBE_MASTER_IP}" ]; then
+ echo "kube master node ip is required."
+ exit 1
+fi
+
+if [ -n "${JOIN_IP}" ]; then
+ echo "### Starting consul client"
+ if [ -z "${ALL_IN_ONE}" ]; then
+ /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
+ else
+ /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
+ fi
+fi
+
+if [ -z "${RUN_MODE}" ]; then
+ echo "non-HA scenario."
+else
+ echo "\n\n### Starting consul agent"
+ cd ./consul
+ ./entry.sh &
+fi
+
+kube_url="http://${KUBE_MASTER_IP}:8080"
+
+if [ "${CLUSTER_TYPE}" == "openshift" ]; then
+ kube_url="https://${KUBE_MASTER_IP}:8443"
+fi
+
+echo "\n\n### Starting kube2consul"
+if [ -z "${PDM_CONTROLLER_IP}" ]; then
+ /bin/kube2consul --kube_master_url ${kube_url}
+else
+ echo "in Paas mode."
+ /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527
+fi \ No newline at end of file