From a7837a0ac51704003c6aacba2dacb8e64f681622 Mon Sep 17 00:00:00 2001 From: HuabingZhao Date: Thu, 31 Aug 2017 16:04:55 +0800 Subject: remove kube2consul codes Issue-Id: OOM-61 Change-Id: I4eb076835a051ed6bb3abbdcb90aafdf4d4a7149 Signed-off-by: HuabingZhao --- kube2consul/bin/Dockerfile | 11 - kube2consul/bin/blueprint/deploy.yml | 8 - kube2consul/bin/blueprint/list_of_servicelet.list | 4 - kube2consul/bin/blueprint/task.yml | 118 ---- kube2consul/bin/blueprint/vars.yml | 14 - kube2consul/bin/start.sh | 36 -- kube2consul/pom.xml | 109 ---- kube2consul/src/kube2consul/Makefile | 11 - kube2consul/src/kube2consul/consul_work.go | 366 ----------- kube2consul/src/kube2consul/kube2consul.go | 449 -------------- kube2consul/src/kube2consul/kube_service.go | 100 --- kube2consul/src/kube2consul/kube_work.go | 511 ---------------- kube2consul/src/kube2consul/pod_service.go | 677 --------------------- kube2consul/src/kube2consul/types.go | 96 --- .../src/kube2consul/util/restclient/restclient.go | 42 -- kube2consul/src/main/blueprint/deploy.yml | 8 - .../src/main/blueprint/list_of_servicelet.list | 4 - kube2consul/src/main/blueprint/task.yml | 118 ---- kube2consul/src/main/blueprint/vars.yml | 14 - kube2consul/src/main/docker/Dockerfile | 11 - kube2consul/src/main/docker/start.sh | 36 -- pom.xml | 159 ----- 22 files changed, 2902 deletions(-) delete mode 100644 kube2consul/bin/Dockerfile delete mode 100644 kube2consul/bin/blueprint/deploy.yml delete mode 100644 kube2consul/bin/blueprint/list_of_servicelet.list delete mode 100644 kube2consul/bin/blueprint/task.yml delete mode 100644 kube2consul/bin/blueprint/vars.yml delete mode 100644 kube2consul/bin/start.sh delete mode 100644 kube2consul/pom.xml delete mode 100644 kube2consul/src/kube2consul/Makefile delete mode 100644 kube2consul/src/kube2consul/consul_work.go delete mode 100644 kube2consul/src/kube2consul/kube2consul.go delete mode 100644 kube2consul/src/kube2consul/kube_service.go delete mode 100644 kube2consul/src/kube2consul/kube_work.go delete mode 100644 kube2consul/src/kube2consul/pod_service.go delete mode 100644 kube2consul/src/kube2consul/types.go delete mode 100644 kube2consul/src/kube2consul/util/restclient/restclient.go delete mode 100644 kube2consul/src/main/blueprint/deploy.yml delete mode 100644 kube2consul/src/main/blueprint/list_of_servicelet.list delete mode 100644 kube2consul/src/main/blueprint/task.yml delete mode 100644 kube2consul/src/main/blueprint/vars.yml delete mode 100644 kube2consul/src/main/docker/Dockerfile delete mode 100644 kube2consul/src/main/docker/start.sh delete mode 100644 pom.xml diff --git a/kube2consul/bin/Dockerfile b/kube2consul/bin/Dockerfile deleted file mode 100644 index 278cac5..0000000 --- a/kube2consul/bin/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM alpine:3.3 -ENV CONSUL_VERSION 0.7.1 -ENV BASE / -ADD consul-linux_amd64.tar.gz / -RUN cd /usr/lib \ - && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \ - && ln -s /consul/libintl.so.8.1.3 libintl.so.8 -COPY kube2consul /bin/ -COPY start.sh / - -ENTRYPOINT exec /start.sh \ No newline at end of file diff --git a/kube2consul/bin/blueprint/deploy.yml b/kube2consul/bin/blueprint/deploy.yml deleted file mode 100644 index cc61076..0000000 --- a/kube2consul/bin/blueprint/deploy.yml +++ /dev/null @@ -1,8 +0,0 @@ ---- -- remote_user: ubuntu - become: yes - become_method: sudo - vars_files: - - vars.yml - tasks: - - include: task.yml diff --git a/kube2consul/bin/blueprint/list_of_servicelet.list b/kube2consul/bin/blueprint/list_of_servicelet.list deleted file mode 100644 index 77a1d4f..0000000 --- a/kube2consul/bin/blueprint/list_of_servicelet.list +++ /dev/null @@ -1,4 +0,0 @@ -{ - "servicelet_module":[ - ] -} \ No newline at end of file diff --git a/kube2consul/bin/blueprint/task.yml b/kube2consul/bin/blueprint/task.yml deleted file mode 100644 index 45cfcd9..0000000 --- a/kube2consul/bin/blueprint/task.yml +++ /dev/null @@ -1,118 +0,0 @@ ---- -- name: remove kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - state: absent - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - net: host - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - when: - - all_in_one == 'no' - - master_in_controller == 'no' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - net: host - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - CLUSTER_TYPE: "openshift" - when: - - all_in_one == 'no' - - master_in_controller == 'no' - - cluster_type == 'openshift' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - when: - - all_in_one == 'yes' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - CLUSTER_TYPE: "openshift" - when: - - all_in_one == 'yes' - - cluster_type == 'openshift' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - when: - - master_in_controller == 'yes' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - CLUSTER_TYPE: "openshift" - when: - - master_in_controller == 'yes' - - cluster_type == 'openshift' \ No newline at end of file diff --git a/kube2consul/bin/blueprint/vars.yml b/kube2consul/bin/blueprint/vars.yml deleted file mode 100644 index 971438d..0000000 --- a/kube2consul/bin/blueprint/vars.yml +++ /dev/null @@ -1,14 +0,0 @@ ---- -- api_network_ip: -- man_network_ip: -- registry_url: -- cp_vertype: -- cp_type: -- cp_name: -- cp_version: -- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}" -- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}" -- pdm_controller_ip: "{{vp_ip}}" -- consul_join_ip: "{{zenap_msb_consul_server_ip}}" -- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}" -- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file diff --git a/kube2consul/bin/start.sh b/kube2consul/bin/start.sh deleted file mode 100644 index 033c50b..0000000 --- a/kube2consul/bin/start.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/sh -if [ -z "${KUBE_MASTER_IP}" ]; then - echo "kube master node ip is required." - exit 1 -fi - -if [ -n "${JOIN_IP}" ]; then - echo "### Starting consul client" - if [ -z "${ALL_IN_ONE}" ]; then - /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & - else - /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & - fi -fi - -if [ -z "${RUN_MODE}" ]; then - echo "non-HA scenario." -else - echo "\n\n### Starting consul agent" - cd ./consul - ./entry.sh & -fi - -kube_url="http://${KUBE_MASTER_IP}:8080" - -if [ "${CLUSTER_TYPE}" == "openshift" ]; then - kube_url="https://${KUBE_MASTER_IP}:8443" -fi - -echo "\n\n### Starting kube2consul" -if [ -z "${PDM_CONTROLLER_IP}" ]; then - /bin/kube2consul --kube_master_url ${kube_url} -else - echo "in Paas mode." - /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527 -fi \ No newline at end of file diff --git a/kube2consul/pom.xml b/kube2consul/pom.xml deleted file mode 100644 index 3ab8ce7..0000000 --- a/kube2consul/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ - - - 4.0.0 - - - - org.onap.oom.registrator - oom-registrator-parent - 1.0.0-SNAPSHOT - - - - org.onap.oom.registrator - kube2consul - 1.0.0-SNAPSHOT - pom - - - kube2consul - - - - ${basedir}${file.separator}src - ${basedir}${file.separator}bin - - - com.igormaznitsa - mvn-golang-wrapper - - - maven-resources-plugin - - - copy-resources-dockerfile - process-resources - - copy-resources - - - ${version.output} - true - - - ${dockerFileDir} - false - - **/* - - - - true - - - - copy-resources-blueprint - process-resources - - copy-resources - - - ${version.output}/blueprint - true - - - ${blueprintFileDir} - false - - **/* - - - - true - - - - - - org.apache.maven.plugins - maven-dependency-plugin - - - copytolinux64 - - copy - - prepare-package - - - - com.zte.ums.zenap.msb.components - consul - tar.gz - linux_amd64 - - - ${version.output} - false - true - true - true - - - - - - - - \ No newline at end of file diff --git a/kube2consul/src/kube2consul/Makefile b/kube2consul/src/kube2consul/Makefile deleted file mode 100644 index 141abb0..0000000 --- a/kube2consul/src/kube2consul/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -.PHONY: kube2consul clean test - -kube2consul: kube2consul.go - CGO_ENABLED=0 go build --ldflags '-extldflags "-static"' - strip kube2consul - -clean: - rm -fr kube2consul - -test: clean - go test -v --vmodule=*=4 diff --git a/kube2consul/src/kube2consul/consul_work.go b/kube2consul/src/kube2consul/consul_work.go deleted file mode 100644 index 2e5927a..0000000 --- a/kube2consul/src/kube2consul/consul_work.go +++ /dev/null @@ -1,366 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// consul_work.go -package main - -import ( - "log" - "strings" - "sync" - - consulapi "github.com/hashicorp/consul/api" - kapi "k8s.io/kubernetes/pkg/api" -) - -type ConsulWorker interface { - AddService(config DnsInfo, service *kapi.Service) - RemoveService(config DnsInfo) - AddPod(config DnsInfo, pod *kapi.Pod) - RemovePod(config DnsInfo) - SyncDNS() -} - -type ConsulAgentWorker struct { - sync.Mutex - ConsulWorker - ids map[string][]*consulapi.AgentServiceRegistration - agent *consulapi.Client -} - -func newConsulAgentWorker(client *consulapi.Client) *ConsulAgentWorker { - return &ConsulAgentWorker{ - agent: client, - ids: make(map[string][]*consulapi.AgentServiceRegistration), - } -} - -func (client *ConsulAgentWorker) AddService(config DnsInfo, service *kapi.Service) { - client.Lock() - defer client.Unlock() - log.Println("Starting Add Service for: ", config.BaseID) - - if config.IPAddress == "" || config.BaseID == "" { - log.Println("Service Info is not valid for AddService") - return - } - - annos := service.ObjectMeta.Annotations - //only register service with annotations - if len(annos) == 0 { - log.Println("no Annotation defined for this service:", service.Name) - return - } - - for _, port := range service.Spec.Ports { - kubeService := newKubeService() - kubeService.BuildServiceInfoMap(&port, &annos) - - if len(kubeService.sInfos) == 0 { - log.Println("no Service defined for this port:", port.Port) - continue - } - - //remove service with no name - for id, s := range kubeService.sInfos { - if len(s.Name) == 0 { - if id == "0" { - log.Printf("SERVICE_%d_NAME not set, ignore this service.", port.Port) - } else { - log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", port.Port, id) - } - delete(kubeService.sInfos, id) - } - } - - //test again - if len(kubeService.sInfos) == 0 { - log.Println("no Service defined for this port:", port.Port) - continue - } - - for _, s := range kubeService.sInfos { - asr := kubeService.BuildAgentService(s, config, &port) - if *argChecks && port.Protocol == "TCP" && config.ServiceType != kapi.ServiceTypeClusterIP { - //Create Check if neeeded - asr.Check = kubeService.BuildAgentServiceCheck(config, &port) - } - - if client.agent != nil { - //Registers with DNS - if err := client.agent.Agent().ServiceRegister(asr); err != nil { - log.Println("Error creating service record: ", asr.ID) - } - } - - //Add to IDS - client.ids[config.BaseID] = append(client.ids[config.BaseID], asr) - } - } - log.Println("Added Service: ", service.Name) -} - -func (client *ConsulAgentWorker) RemoveService(config DnsInfo) { - client.Lock() - defer client.Unlock() - if ids, ok := client.ids[config.BaseID]; ok { - for _, asr := range ids { - if client.agent != nil { - if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil { - log.Println("Error removing service: ", err) - } - } - } - delete(client.ids, config.BaseID) - } else { - log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID) - } -} - -func (client *ConsulAgentWorker) AddPod(config DnsInfo, pod *kapi.Pod) { - client.Lock() - defer client.Unlock() - log.Println("Starting Add Pod for: ", config.BaseID) - - //double check if have Pod IPs so far in Paas mode - if *argPdmControllerUrl != "" { - if _, ok := pdmPodIPsMap[pod.Name]; !ok { - log.Println("In Paas mode, We don't have Pod IPs to register with for Pod: ", pod.Name) - return - } - } - - containerIdMap := make(map[string]string) - buildContainerIdMap(pod, &containerIdMap) - - for _, c := range pod.Spec.Containers { - - for _, p := range c.Ports { - podService := newPodService() - podService.BuildServiceInfoMap(&p, &c) - if len(podService.sInfos) == 0 { - log.Println("no Service defined for this port:", p.ContainerPort) - continue - } - - //remove service with no name - for id, s := range podService.sInfos { - if len(s.Name) == 0 { - if id == "0" { - log.Printf("SERVICE_%d_NAME not set, ignore this service.", p.ContainerPort) - } else { - log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", p.ContainerPort, id) - } - delete(podService.sInfos, id) - } - } - - //remove service if same service exist with different protocol - for id, s := range podService.sInfos { - services := []*consulapi.CatalogService{} - sn := s.Name - //append namespace if specified - if len(s.NS) != 0 { - sn = sn + "-" + s.NS - } - var tags []string - var protocol string - tags = strings.Split(s.Tags, ",") - for _, v := range tags { - var elems []string - elems = strings.Split(v, ":") - if elems[0] == PROTOCOL { - protocol = elems[1] - break - } - } - //remove service with empty protocol - if protocol == "" { - delete(podService.sInfos, id) - continue - } - - protocol_field := "\"protocol\":\"" + protocol + "\"" - - //query consul - if client.agent != nil { - same_protocol := false - services, _, _ = client.agent.Catalog().Service(sn, "", nil) - if services == nil || len(services) == 0 { - continue - } - for _, v := range services[0].ServiceTags { - if strings.Contains(v, protocol_field) { - same_protocol = true - break - } - } - - if !same_protocol { - log.Printf("same service with different protocol already exist, ignore service:%s, protocol:%s", sn, protocol) - delete(podService.sInfos, id) - } - } - } - - //remove service with no network plane type in tags - if *argPdmControllerUrl != "" { - for id, s := range podService.sInfos { - if len(s.Tags) == 0 { - if id == "0" { - log.Printf("SERVICE_%d_TAGS not set, ignore this service in Paas mode.", p.ContainerPort) - } else { - log.Printf("SERVICE_%d_TAGS_%s not set, ignore this service in Paas mode.", p.ContainerPort, id) - } - delete(podService.sInfos, id) - } else { - var tags []string - tags = strings.Split(s.Tags, ",") - for _, v := range tags { - var elems []string - elems = strings.Split(v, ":") - //this is saved in Tags, later will be moved to Labels?? - if elems[0] == NETWORK_PLANE_TYPE { - types := strings.Split(elems[1], "|") - for _, t := range types { - ip := getIPFromPodIPsMap(t, pod.Name) - if ip == "" { - log.Printf("User defined network type:%s has no assigned ip for Pod:%s", t, pod.Name) - } else { - log.Printf("Found ip:%s for network type:%s", ip, t) - s.IPs = append(s.IPs, PodIP{ - NetworkPlane: t, - IPAddress: ip, - }) - } - } - } - } - - if len(s.IPs) == 0 { - log.Printf("In Paas mode, no IP assigned for Pod:ContainerPort->%s:%d", pod.Name, p.ContainerPort) - delete(podService.sInfos, id) - } - } - } - } - - //test again - if len(podService.sInfos) == 0 { - log.Println("no Service defined for this port:", p.ContainerPort) - continue - } - - for _, s := range podService.sInfos { - asrs := podService.BuildAgentService(s, config, &p, containerIdMap[c.Name]) - if client.agent != nil { - for _, asr := range asrs { - //Registers with DNS - if err := client.agent.Agent().ServiceRegister(asr); err != nil { - log.Println("Error creating service record: ", asr.ID) - } else { - log.Printf("Added service:instance ->%s:%s", asr.Name, asr.ID) - } - client.ids[config.BaseID] = append(client.ids[config.BaseID], asr) - } - } else { - log.Println("Consul client is not available.") - } - } - } - } - log.Println("Added Pod: ", pod.Name) -} - -func (client *ConsulAgentWorker) RemovePod(config DnsInfo) { - client.Lock() - defer client.Unlock() - if ids, ok := client.ids[config.BaseID]; ok { - for _, asr := range ids { - if client.agent != nil { - if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil { - log.Println("Error removing service: ", err) - } else { - log.Printf("removed service -> %s:%d", asr.Name, asr.Port) - } - } - } - delete(client.ids, config.BaseID) - log.Println("Removed Pod: ", config.BaseID) - } else { - log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID) - } -} - -func (client *ConsulAgentWorker) SyncDNS() { - if client.agent != nil { - if services, err := client.agent.Agent().Services(); err == nil { - for _, registered := range client.ids { - for _, service := range registered { - if !containsServiceId(service.ID, services) { - log.Println("Regregistering missing service ID: ", service.ID) - client.agent.Agent().ServiceRegister(service) - } - } - } - for _, service := range services { - found := false - for _, registered := range client.ids { - for _, svc := range registered { - if service.ID == svc.ID { - found = true - break - } - } - if found { - break - } - } - if !found { - log.Println("Degregistering dead service ID: ", service.ID) - client.agent.Agent().ServiceDeregister(service.ID) - } - } - } else { - log.Println("Error retreiving services from consul during sync: ", err) - } - } -} - -func buildContainerIdMap(pod *kapi.Pod, cmap *map[string]string) { - for _, s := range pod.Status.ContainerStatuses { - id := strings.TrimLeft(s.ContainerID, "docker://") - (*cmap)[s.Name] = id - } -} - -func containsServiceId(id string, services map[string]*consulapi.AgentService) bool { - for _, service := range services { - if service.ID == id { - return true - } - } - return false -} - -func getIPFromPodIPsMap(ptype string, podname string) string { - ips := pdmPodIPsMap[podname] - for _, v := range ips { - if v.NetworkPlane == ptype { - return v.IPAddress - } - } - return "" -} diff --git a/kube2consul/src/kube2consul/kube2consul.go b/kube2consul/src/kube2consul/kube2consul.go deleted file mode 100644 index f1213f2..0000000 --- a/kube2consul/src/kube2consul/kube2consul.go +++ /dev/null @@ -1,449 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// kube2consul.go -package main - -import ( - "flag" - "fmt" - "log" - "net/url" - "os" - "reflect" - "time" - - consulapi "github.com/hashicorp/consul/api" - kapi "k8s.io/kubernetes/pkg/api" - kcache "k8s.io/kubernetes/pkg/client/cache" - kclient "k8s.io/kubernetes/pkg/client/unversioned" - kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" - kframework "k8s.io/kubernetes/pkg/controller/framework" - kselector "k8s.io/kubernetes/pkg/fields" - klabels "k8s.io/kubernetes/pkg/labels" -) - -var ( - argConsulAgent = flag.String("consul-agent", "http://127.0.0.1:8500", "URL to consul agent") - argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.") - argChecks = flag.Bool("checks", false, "Adds TCP service checks for each TCP Service") - argPdmControllerUrl = flag.String("pdm_controller_url", "", "URL to pdm controller") - addMap = make(map[string]*kapi.Pod) - deleteMap = make(map[string]*kapi.Pod) - pdmPodIPsMap = make(map[string][]PodIP) - nodeSelector = klabels.Everything() -) - -const ( - // Maximum number of attempts to connect to consul server. - maxConnectAttempts = 12 - // Resync period for the kube controller loop. - resyncPeriod = 5 * time.Second - // Default Health check interval - DefaultInterval = "10s" - // Resource url to query pod from PDM controller - podUrl = "nw/v1/tenants" -) - -const ( - Service_NAME = "NAME" - Service_TAGS = "TAGS" - Service_LABELS = "LABELS" - Service_MDATA = "MDATA" - Service_NS = "NAMESPACE" - Service_TTL = "TTL" - Service_HTTP = "HTTP" - Service_TCP = "TCP" - Service_CMD = "CMD" - Service_SCRIPT = "SCRIPT" - Service_TIMEOUT = "TIMEOUT" - Service_INTERVAL = "INTERVAL" -) - -const ( - Table_BASE = "base" - Table_LB = "lb" - Table_LABELS = "labels" - Table_META_DATA = "metadata" - Table_NS = "ns" - Table_CHECKS = "checks" -) - -const ( - PROTOCOL = "protocol" - PROTOCOL_UI = "UI" - PREFIX_UI = "IUI_" -) - -const ( - //filter out from the tags for compatibility - NETWORK_PLANE_TYPE = "network_plane_type" - VISUAL_RANGE = "visualRange" - LB_POLICY = "lb_policy" - LB_SVR_PARAMS = "lb_server_params" -) - -const DefaultLabels = "\"labels\":{\"visualRange\":\"1\"}" - -func getKubeMasterUrl() (string, error) { - if *argKubeMasterUrl == "" { - return "", fmt.Errorf("no --kube_master_url specified") - } - parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl)) - if err != nil { - return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err) - } - if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" { - return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl) - } - return parsedUrl.String(), nil -} - -func newConsulClient(consulAgent string) (*consulapi.Client, error) { - var ( - client *consulapi.Client - err error - ) - - consulConfig := consulapi.DefaultConfig() - consulAgentUrl, err := url.Parse(consulAgent) - if err != nil { - log.Println("Error parsing Consul url") - return nil, err - } - - if consulAgentUrl.Host != "" { - consulConfig.Address = consulAgentUrl.Host - } - - if consulAgentUrl.Scheme != "" { - consulConfig.Scheme = consulAgentUrl.Scheme - } - - client, err = consulapi.NewClient(consulConfig) - if err != nil { - log.Println("Error creating Consul client") - return nil, err - } - - for attempt := 1; attempt <= maxConnectAttempts; attempt++ { - if _, err = client.Agent().Self(); err == nil { - break - } - - if attempt == maxConnectAttempts { - break - } - - log.Printf("[Attempt: %d] Attempting access to Consul after 5 second sleep", attempt) - time.Sleep(5 * time.Second) - } - - if err != nil { - return nil, fmt.Errorf("failed to connect to Consul agent: %v, error: %v", consulAgent, err) - } - log.Printf("Consul agent found: %v", consulAgent) - - return client, nil -} - -func newKubeClient() (*kclient.Client, error) { - masterUrl, err := getKubeMasterUrl() - if err != nil { - return nil, err - } - overrides := &kclientcmd.ConfigOverrides{} - overrides.ClusterInfo.Server = masterUrl - - rules := kclientcmd.NewDefaultClientConfigLoadingRules() - kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() - - if err != nil { - log.Println("Error creating Kube Config", err) - return nil, err - } - kubeConfig.Host = masterUrl - - log.Printf("Using %s for kubernetes master", kubeConfig.Host) - return kclient.New(kubeConfig) -} - -func createNodeLW(kubeClient *kclient.Client) *kcache.ListWatch { - return kcache.NewListWatchFromClient(kubeClient, "nodes", kapi.NamespaceAll, kselector.Everything()) -} - -func sendNodeWork(action KubeWorkAction, queue chan<- KubeWork, oldObject, newObject interface{}) { - if node, ok := newObject.(*kapi.Node); ok { - if nodeSelector.Matches(klabels.Set(node.Labels)) == false { - log.Printf("Ignoring node %s due to label selectors", node.Name) - return - } - - log.Println("Node Action: ", action, " for node ", node.Name) - work := KubeWork{ - Action: action, - Node: node, - } - - if action == KubeWorkUpdateNode { - if oldNode, ok := oldObject.(*kapi.Node); ok { - if nodeReady(node) != nodeReady(oldNode) { - log.Println("Ready state change. Old:", nodeReady(oldNode), " New: ", nodeReady(node)) - queue <- work - } - } - } else { - queue <- work - } - } -} - -func watchForNodes(kubeClient *kclient.Client, queue chan<- KubeWork) { - _, nodeController := kframework.NewInformer( - createNodeLW(kubeClient), - &kapi.Node{}, - resyncPeriod, - kframework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - sendNodeWork(KubeWorkAddNode, queue, nil, obj) - }, - DeleteFunc: func(obj interface{}) { - sendNodeWork(KubeWorkRemoveNode, queue, nil, obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - sendNodeWork(KubeWorkUpdateNode, queue, oldObj, newObj) - }, - }, - ) - stop := make(chan struct{}) - go nodeController.Run(stop) -} - -// Returns a cache.ListWatch that gets all changes to services. -func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch { - return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything()) -} - -func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) { - if service, ok := serviceObj.(*kapi.Service); ok { - log.Println("Service Action: ", action, " for service ", service.Name) - queue <- KubeWork{ - Action: action, - Service: service, - } - } -} - -func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) { - _, svcController := kframework.NewInformer( - createServiceLW(kubeClient), - &kapi.Service{}, - resyncPeriod, - kframework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - sendServiceWork(KubeWorkAddService, queue, obj) - }, - DeleteFunc: func(obj interface{}) { - sendServiceWork(KubeWorkRemoveService, queue, obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - if reflect.DeepEqual(newObj, oldObj) == false { - sendServiceWork(KubeWorkUpdateService, queue, newObj) - } - }, - }, - ) - stop := make(chan struct{}) - go svcController.Run(stop) -} - -// Returns a cache.ListWatch that gets all changes to Pods. -func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch { - return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything()) -} - -// Dispatch the notifications for Pods by type to the worker -func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) { - if pod, ok := podObj.(*kapi.Pod); ok { - log.Println("Pod Action: ", action, " for Pod:", pod.Name) - queue <- KubeWork{ - Action: action, - Pod: pod, - } - } -} - -// Launch the go routine to watch notifications for Pods. -func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) { - var podController *kframework.Controller - _, podController = kframework.NewInformer( - createPodLW(kubeClient), - &kapi.Pod{}, - resyncPeriod, - kframework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - sendPodWork(KubeWorkAddPod, queue, obj) - }, - DeleteFunc: func(obj interface{}) { - if o, ok := obj.(*kapi.Pod); ok { - if _, ok := deleteMap[o.Name]; ok { - delete(deleteMap, o.Name) - } - } - sendPodWork(KubeWorkRemovePod, queue, obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod) - if reflect.DeepEqual(oldObj, newObj) == false { - //Adding Pod - if _, ok := addMap[n.Name]; ok { - if kapi.IsPodReady(n) { - delete(addMap, n.Name) - sendPodWork(KubeWorkUpdatePod, queue, newObj) - } - return - } - //Deleting Pod - if _, ok := deleteMap[n.Name]; ok { - return - } else { - if o.ObjectMeta.DeletionTimestamp == nil && - n.ObjectMeta.DeletionTimestamp != nil { - deleteMap[n.Name] = n - return - } - //Updating Pod - sendPodWork(KubeWorkUpdatePod, queue, newObj) - } - } - }, - }, - ) - stop := make(chan struct{}) - go podController.Run(stop) -} - -func runBookKeeper(workQue <-chan KubeWork, consulQueue chan<- ConsulWork, apiClient *kclient.Client) { - - client := newClientBookKeeper(apiClient) - client.consulQueue = consulQueue - - for work := range workQue { - switch work.Action { - case KubeWorkAddNode: - client.AddNode(work.Node) - case KubeWorkRemoveNode: - client.RemoveNode(work.Node.Name) - case KubeWorkUpdateNode: - client.UpdateNode(work.Node) - case KubeWorkAddService: - client.AddService(work.Service) - case KubeWorkRemoveService: - client.RemoveService(work.Service) - case KubeWorkUpdateService: - client.UpdateService(work.Service) - case KubeWorkAddPod: - client.AddPod(work.Pod) - case KubeWorkRemovePod: - client.RemovePod(work.Pod) - case KubeWorkUpdatePod: - client.UpdatePod(work.Pod) - case KubeWorkSync: - client.Sync() - default: - log.Println("Unsupported work action: ", work.Action) - } - } - log.Println("Completed all work") -} - -func runConsulWorker(queue <-chan ConsulWork, client *consulapi.Client) { - worker := newConsulAgentWorker(client) - - for work := range queue { - log.Println("Consul Work Action: ", work.Action, " BaseID:", work.Config.BaseID) - - switch work.Action { - case ConsulWorkAddService: - worker.AddService(work.Config, work.Service) - case ConsulWorkRemoveService: - worker.RemoveService(work.Config) - case ConsulWorkAddPod: - worker.AddPod(work.Config, work.Pod) - case ConsulWorkRemovePod: - worker.RemovePod(work.Config) - case ConsulWorkSyncDNS: - worker.SyncDNS() - default: - log.Println("Unsupported Action of: ", work.Action) - } - - } -} - -func main() { - flag.Parse() - // TODO: Validate input flags. - var err error - var consulClient *consulapi.Client - - if consulClient, err = newConsulClient(*argConsulAgent); err != nil { - log.Fatalf("Failed to create Consul client - %v", err) - } - - kubeClient, err := newKubeClient() - if err != nil { - log.Fatalf("Failed to create a kubernetes client: %v", err) - } - - if _, err := kubeClient.ServerVersion(); err != nil { - log.Fatal("Could not connect to Kube Master", err) - } else { - log.Println("Connected to K8S API Server") - } - - kubeWorkQueue := make(chan KubeWork) - consulWorkQueue := make(chan ConsulWork) - go runBookKeeper(kubeWorkQueue, consulWorkQueue, kubeClient) - watchForNodes(kubeClient, kubeWorkQueue) - watchForServices(kubeClient, kubeWorkQueue) - watchForPods(kubeClient, kubeWorkQueue) - go runConsulWorker(consulWorkQueue, consulClient) - - log.Println("Running Consul Sync loop every: 60 seconds") - csyncer := time.NewTicker(time.Second * time.Duration(60)) - go func() { - for t := range csyncer.C { - log.Println("Consul Sync request at:", t) - consulWorkQueue <- ConsulWork{ - Action: ConsulWorkSyncDNS, - } - } - }() - - log.Println("Running Kube Sync loop every: 60 seconds") - ksyncer := time.NewTicker(time.Second * time.Duration(60)) - go func() { - for t := range ksyncer.C { - log.Println("Kube Sync request at:", t) - kubeWorkQueue <- KubeWork{ - Action: KubeWorkSync, - } - } - }() - - // Prevent exit - select {} -} diff --git a/kube2consul/src/kube2consul/kube_service.go b/kube2consul/src/kube2consul/kube_service.go deleted file mode 100644 index 887f6c8..0000000 --- a/kube2consul/src/kube2consul/kube_service.go +++ /dev/null @@ -1,100 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// kube_service.go -package main - -import ( - "log" - "strconv" - "strings" - - consulapi "github.com/hashicorp/consul/api" - kapi "k8s.io/kubernetes/pkg/api" -) - -type KubeServiceAction interface { - BuildServiceInfoMap(*kapi.ServicePort, *map[string]string) - BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceRegistration - BuildAgentServiceCheck(DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceCheck -} - -type KubeService struct { - KubeServiceAction - sInfos map[string]*ServiceInfo -} - -func newKubeService() *KubeService { - return &KubeService{ - sInfos: make(map[string]*ServiceInfo), - } -} - -func (kube *KubeService) BuildServiceInfoMap(sport *kapi.ServicePort, annos *map[string]string) { - portstr := strconv.Itoa(int(sport.Port)) - nameprefix := "SERVICE_" + portstr + "_NAME" - tagprefix := "SERVICE_" + portstr + "_TAGS" - - for name, value := range *annos { - if strings.HasPrefix(name, nameprefix) { - addToServiceInfoMap(name, value, Service_NAME, &kube.sInfos) - } else if strings.HasPrefix(name, tagprefix) { - addToServiceInfoMap(name, value, Service_TAGS, &kube.sInfos) - } - } -} - -func (kube *KubeService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceRegistration { - name := sInfo.Name - var tagslice []string - tagslice = strings.Split(sInfo.Tags, ",") - - for _, elem := range tagslice { - var elemslice []string - elemslice = strings.Split(elem, ":") - if elemslice[0] == PROTOCOL { - switch elemslice[1] { - case PROTOCOL_UI: - name = PREFIX_UI + name - default: - log.Println("regular protocol:", elemslice[1]) - } - break - } - } - - asrID := config.BaseID + "-" + strconv.Itoa(int(sport.Port)) + "-" + name - - regPort := sport.NodePort - if config.ServiceType == kapi.ServiceTypeClusterIP { - regPort = sport.Port - } - - return &consulapi.AgentServiceRegistration{ - ID: asrID, - Name: name, - Address: config.IPAddress, - Port: int(regPort), - Tags: tagslice, - } -} - -func (kube *KubeService) BuildAgentServiceCheck(config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceCheck { - log.Println("Creating service check for: ", config.IPAddress, " on Port: ", sport.NodePort) - return &consulapi.AgentServiceCheck{ - TCP: config.IPAddress + ":" + strconv.Itoa(int(sport.NodePort)), - Interval: "60s", - } -} diff --git a/kube2consul/src/kube2consul/kube_work.go b/kube2consul/src/kube2consul/kube_work.go deleted file mode 100644 index 4695a76..0000000 --- a/kube2consul/src/kube2consul/kube_work.go +++ /dev/null @@ -1,511 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// kube_work.go -package main - -import ( - "encoding/json" - "kube2consul/util/restclient" - "log" - "strings" - "sync" - "time" - - kapi "k8s.io/kubernetes/pkg/api" - kclient "k8s.io/kubernetes/pkg/client/unversioned" - kselector "k8s.io/kubernetes/pkg/fields" - klabels "k8s.io/kubernetes/pkg/labels" -) - -type KubeBookKeeper interface { - AddNode(*kapi.Node) - RemoveNode(*kapi.Node) - UpdateNode(*kapi.Node) - AddService(*kapi.Service) - RemoveService(*kapi.Service) - UpdateService(*kapi.Service) - AddPod(*kapi.Pod) - RemovePod(*kapi.Pod) - UpdatePod(*kapi.Pod) -} - -type ClientBookKeeper struct { - sync.Mutex - KubeBookKeeper - client *kclient.Client - nodes map[string]*KubeNode - services map[string]*kapi.Service - pods map[string]*kapi.Pod - consulQueue chan<- ConsulWork -} - -type KubeNode struct { - name string - readyStatus bool - serviceIDS map[string]string - address string -} - -func buildServiceBaseID(nodeName string, service *kapi.Service) string { - return nodeName + "-" + service.Name -} - -func nodeReady(node *kapi.Node) bool { - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == kapi.NodeReady { - return node.Status.Conditions[i].Status == kapi.ConditionTrue - } - } - - log.Println("NodeReady condition is missing from node: ", node.Name) - return false -} - -func newKubeNode() *KubeNode { - return &KubeNode{ - name: "", - readyStatus: false, - serviceIDS: make(map[string]string), - } -} - -func newClientBookKeeper(client *kclient.Client) *ClientBookKeeper { - return &ClientBookKeeper{ - client: client, - nodes: make(map[string]*KubeNode), - services: make(map[string]*kapi.Service), - pods: make(map[string]*kapi.Pod), - } -} - -func (client *ClientBookKeeper) AddNode(node *kapi.Node) { - client.Lock() - defer client.Unlock() - if _, ok := client.nodes[node.Name]; ok { - log.Printf("Node:%s already exist. skip this ADD notification.", node.Name) - return - } - //Add to Node Collection - kubeNode := newKubeNode() - kubeNode.readyStatus = nodeReady(node) - kubeNode.name = node.Name - kubeNode.address = node.Status.Addresses[0].Address - - //Send request for Service Addition for node and all serviceIDS (Create Service ID here) - if kubeNode.readyStatus { - client.AddAllServicesToNode(kubeNode) - } - - client.nodes[node.Name] = kubeNode - log.Println("Added Node: ", node.Name) -} - -func (client *ClientBookKeeper) AddAllServicesToNode(node *KubeNode) { - for _, service := range client.services { - if service.Spec.Type == kapi.ServiceTypeClusterIP { - log.Println("ClusterIP service ignored for node binding:", service.Name) - } else { - client.AttachServiceToNode(node, service) - } - } -} - -func (client *ClientBookKeeper) AttachServiceToNode(node *KubeNode, service *kapi.Service) { - baseID := buildServiceBaseID(node.name, service) - client.consulQueue <- ConsulWork{ - Action: ConsulWorkAddService, - Service: service, - Config: DnsInfo{ - BaseID: baseID, - IPAddress: node.address, - ServiceType: service.Spec.Type, - }, - } - log.Println("Requesting Addition of services with Base ID: ", baseID) - node.serviceIDS[service.Name] = baseID -} - -func (client *ClientBookKeeper) RemoveNode(name string) { - client.Lock() - defer client.Unlock() - if kubeNode, ok := client.nodes[name]; ok { - //Remove All DNS for node - client.RemoveAllServicesFromNode(kubeNode) - //Remove Node from Collection - delete(client.nodes, name) - } else { - log.Println("Attmepted to remove missing node: ", name) - } - - log.Println("Queued Node to be removed: ", name) -} - -func (client *ClientBookKeeper) RemoveAllServicesFromNode(node *KubeNode) { - for _, service := range client.services { - if service.Spec.Type == kapi.ServiceTypeClusterIP { - log.Println("ClusterIP service ignored for node unbinding:", service.Name) - } else { - client.DetachServiceFromNode(node, service) - } - } -} - -func (client *ClientBookKeeper) DetachServiceFromNode(node *KubeNode, service *kapi.Service) { - if baseID, ok := node.serviceIDS[service.Name]; ok { - client.consulQueue <- ConsulWork{ - Action: ConsulWorkRemoveService, - Config: DnsInfo{ - BaseID: baseID, - }, - } - - log.Println("Requesting Removal of services with Base ID: ", baseID) - delete(node.serviceIDS, service.Name) - } -} - -func (client *ClientBookKeeper) UpdateNode(node *kapi.Node) { - if nodeReady(node) { - // notReady nodes also stored in client.nodes - client.AddAllServicesToNode(client.nodes[node.Name]) - } else { - client.RemoveAllServicesFromNode(client.nodes[node.Name]) - } -} - -func (client *ClientBookKeeper) AddService(svc *kapi.Service) { - client.Lock() - defer client.Unlock() - if !isUserDefinedService(svc) { - log.Println("Not the target, skip this ADD notification for service:", svc.Name) - return - } - - if _, ok := client.services[svc.Name]; ok { - log.Printf("service:%s already exist. skip this ADD notification.", svc.Name) - return - } - - if kapi.IsServiceIPSet(svc) { - if svc.Spec.Type == kapi.ServiceTypeClusterIP { - log.Println("Adding ClusterIP service:", svc.Name) - client.consulQueue <- ConsulWork{ - Action: ConsulWorkAddService, - Service: svc, - Config: DnsInfo{ - BaseID: svc.Name, - IPAddress: svc.Spec.ClusterIP, - ServiceType: svc.Spec.Type, - }, - } - } else { - //Check and use local node list first - if len(client.nodes) > 0 { - for _, kubeNode := range client.nodes { - if kubeNode.readyStatus { - client.AttachServiceToNode(kubeNode, svc) - } - } - } else { - log.Println("local node list is empty, retrieve it from the api server.") - nodes := client.client.Nodes() - if nodeList, err := nodes.List(kapi.ListOptions{ - LabelSelector: klabels.Everything(), - FieldSelector: kselector.Everything(), - }); err == nil { - for _, node := range nodeList.Items { - if nodeReady(&node) { - kubeNode := newKubeNode() - kubeNode.readyStatus = nodeReady(&node) - kubeNode.name = node.Name - kubeNode.address = node.Status.Addresses[0].Address - client.AttachServiceToNode(kubeNode, svc) - } - } - } - } - } - client.services[svc.Name] = svc - log.Println("Queued Service to be added: ", svc.Name) - } else { - // if ClusterIP is not set, do not create a DNS records - log.Printf("Skipping dns record for headless service: %s\n", svc.Name) - } -} - -func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) { - client.Lock() - defer client.Unlock() - if !isUserDefinedService(svc) { - log.Println("Not the target, skip this Remove notification for service:", svc.Name) - return - } - - if _, ok := client.services[svc.Name]; !ok { - log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name) - return - } - - if svc.Spec.Type == kapi.ServiceTypeClusterIP { - log.Println("Removing ClusterIP service:", svc.Name) - //Perform All DNS Removes - client.consulQueue <- ConsulWork{ - Action: ConsulWorkRemoveService, - Config: DnsInfo{ - BaseID: svc.Name, - }, - } - } else { - //Check and use local node list first - if len(client.nodes) > 0 { - for _, kubeNode := range client.nodes { - if kubeNode.readyStatus { - client.DetachServiceFromNode(kubeNode, svc) - } - } - } else { - log.Println("local node list is empty, retrieve it from the api server. sync it later.") - } - } - delete(client.services, svc.Name) - log.Println("Queued Service to be removed: ", svc.Name) -} - -func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) { - if !isUserDefinedService(svc) { - log.Println("Not the target, skip this Update notification for service:", svc.Name) - return - } - - client.RemoveService(svc) - client.AddService(svc) -} - -func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) { - client.Lock() - defer client.Unlock() - if !isUserDefinedPod(pod) { - log.Println("Not the target, skip this ADD notification for pod:", pod.Name) - return - } - - if _, ok := client.pods[pod.Name]; ok { - log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name) - return - } - - //newly added Pod - if pod.Name == "" || pod.Status.PodIP == "" { - log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name) - addMap[pod.Name] = pod - return - } - //existing Pod - if kapi.IsPodReady(pod) { - if *argPdmControllerUrl != "" { - fillPdmPodIPsMap(pod) - } - } - //Perform All DNS Adds - client.consulQueue <- ConsulWork{ - Action: ConsulWorkAddPod, - Pod: pod, - Config: DnsInfo{ - BaseID: pod.Name, - IPAddress: pod.Status.PodIP, - }, - } - client.pods[pod.Name] = pod - log.Println("Queued Pod to be added: ", pod.Name) -} - -func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) { - client.Lock() - defer client.Unlock() - if !isUserDefinedPod(pod) { - log.Println("Not the target, skip this Remove notification for pod:", pod.Name) - return - } - - if _, ok := client.pods[pod.Name]; !ok { - log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name) - return - } - //Perform All DNS Removes - client.consulQueue <- ConsulWork{ - Action: ConsulWorkRemovePod, - Config: DnsInfo{ - BaseID: pod.Name, - }, - } - delete(client.pods, pod.Name) - log.Println("Queued Pod to be removed: ", pod.Name) -} - -func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) { - if !isUserDefinedPod(pod) { - log.Println("Not the target, skip this Update notification for pod:", pod.Name) - return - } - - if *argPdmControllerUrl != "" { - fillPdmPodIPsMap(pod) - } - client.RemovePod(pod) - client.AddPod(pod) -} - -func (client *ClientBookKeeper) Sync() { - nodes := client.client.Nodes() - if nodeList, err := nodes.List(kapi.ListOptions{ - LabelSelector: klabels.Everything(), - FieldSelector: kselector.Everything(), - }); err == nil { - for name, _ := range client.nodes { - if !containsNodeName(name, nodeList) { - log.Printf("Bookkeeper has node: %s that does not exist in api server", name) - client.RemoveNode(name) - } - } - } - - svcs := client.client.Services(kapi.NamespaceAll) - if svcList, err := svcs.List(kapi.ListOptions{ - LabelSelector: klabels.Everything(), - FieldSelector: kselector.Everything(), - }); err == nil { - for name, svc := range client.services { - if !containsSvcName(name, svcList) { - log.Printf("Bookkeeper has service: %s that does not exist in api server", name) - client.RemoveService(svc) - } - } - } - - pods := client.client.Pods(kapi.NamespaceAll) - if podList, err := pods.List(kapi.ListOptions{ - LabelSelector: klabels.Everything(), - FieldSelector: kselector.Everything(), - }); err == nil { - for name, pod := range client.pods { - if !containsPodName(name, podList) { - log.Printf("Bookkeeper has pod: %s that does not exist in api server", name) - if _, ok := deleteMap[pod.Name]; ok { - log.Println("Missing remove notification for Pod: ", pod.Name) - delete(deleteMap, pod.Name) - } - client.RemovePod(pod) - } - } - //It was observed that the kube notification may lost after the restarting of kube master. - //Currently this is only done for Pod. same for Node and Service will be done when required. - for _, pod := range podList.Items { - if _, ok := client.pods[pod.ObjectMeta.Name]; !ok { - if kapi.IsPodReady(&pod) { - log.Println("Missing add/update notification for Pod: ", pod.ObjectMeta.Name) - client.AddPod(&pod) - } - } - } - } -} - -func containsPodName(name string, pods *kapi.PodList) bool { - for _, pod := range pods.Items { - if pod.ObjectMeta.Name == name { - return true - } - } - return false -} - -func containsSvcName(name string, svcs *kapi.ServiceList) bool { - for _, svc := range svcs.Items { - if svc.ObjectMeta.Name == name { - return true - } - } - return false -} - -func containsNodeName(name string, nodes *kapi.NodeList) bool { - for _, node := range nodes.Items { - if node.ObjectMeta.Name == name { - return true - } - } - return false -} - -func isUserDefinedPod(pod *kapi.Pod) bool { - for _, c := range pod.Spec.Containers { - for _, e := range c.Env { - if strings.HasPrefix(e.Name, "SERVICE_") { - if strings.Contains(e.Name, "_NAME") { - return true - } - } - } - } - return false -} - -func isUserDefinedService(svc *kapi.Service) bool { - for name, _ := range svc.ObjectMeta.Annotations { - if strings.HasPrefix(name, "SERVICE_") { - if strings.Contains(name, "_NAME") { - return true - } - } - } - return false -} - -func fillPdmPodIPsMap(pod *kapi.Pod) { - base := *argPdmControllerUrl - resUrl := podUrl + "/" + pod.Namespace + "/pods" - rclient := restclient.NewRESTClient(base, resUrl, pod.Name) - //try 10 times at maximum - for i := 0; i < 10; i++ { - log.Printf("try REST get the PDM PodIP for pod:%s at %d time..", pod.Name, i) - buf, err := rclient.Get() - if err != nil { - log.Printf("request PDM PodIP info for Pod:%s with error:%v", pod.Name, err) - time.Sleep(6 * 1e9) //sleep for 6 secs - continue - } - podIPInfo := PdmPodIPInfo{} - json.Unmarshal(buf, &podIPInfo) - IPInfo := podIPInfo.Pod - IPs := IPInfo.IPs - if len(IPs) == 0 { - log.Printf("request PDM PodIP info for Pod:%s return empty ip list.", pod.Name) - return - } else { - for _, ip := range IPs { - if ip.IPAddress == "" { - log.Printf("request PDM PodIP info for Pod:%s return empty ip.", pod.Name) - return - } - } - pdmPodIPsMap[pod.Name] = IPs - } - log.Println("successfully REST get the PDM PodIP for pod:", pod.Name) - return - } - - log.Println("failed to REST get the PDM PodIP for pod:", pod.Name) -} diff --git a/kube2consul/src/kube2consul/pod_service.go b/kube2consul/src/kube2consul/pod_service.go deleted file mode 100644 index c6294cb..0000000 --- a/kube2consul/src/kube2consul/pod_service.go +++ /dev/null @@ -1,677 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// pod_service.go -package main - -import ( - "fmt" - "log" - "strconv" - "strings" - - consulapi "github.com/hashicorp/consul/api" - kapi "k8s.io/kubernetes/pkg/api" -) - -type PodServiceAction interface { - BuildServiceInfoMap(*kapi.ContainerPort, *kapi.Container) - BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ContainerPort, string) *consulapi.AgentServiceRegistration -} - -type PodService struct { - PodServiceAction - sInfos map[string]*ServiceInfo -} - -func newPodService() *PodService { - return &PodService{ - sInfos: make(map[string]*ServiceInfo), - } -} - -func (pod *PodService) BuildServiceInfoMap(cport *kapi.ContainerPort, container *kapi.Container) { - portstr := strconv.Itoa(int(cport.ContainerPort)) - fullname := "SERVICE_" + portstr + "_NAME" - nameprefix := "SERVICE_" + portstr + "_NAME_" - tagprefix := "SERVICE_" + portstr + "_TAGS" - labelprefix := "SERVICE_" + portstr + "_ROUTE_LABELS" - mdataprefix := "SERVICE_" + portstr + "_META_DATA" - nsprefix := "SERVICE_" + portstr + "_NAMESPACE" - ttlprefix := "SERVICE_" + portstr + "_CHECK_TTL" - httpprefix := "SERVICE_" + portstr + "_CHECK_HTTP" - tcpprefix := "SERVICE_" + portstr + "_CHECK_TCP" - cmdprefix := "SERVICE_" + portstr + "_CHECK_CMD" - scriptprefix := "SERVICE_" + portstr + "_CHECK_SCRIPT" - timeoutprefix := "SERVICE_" + portstr + "_CHECK_TIMEOUT" - intervalprefix := "SERVICE_" + portstr + "_CHECK_INTERVAL" - for _, env := range container.Env { - if env.Name == fullname || strings.HasPrefix(env.Name, nameprefix) { - addToServiceInfoMap(env.Name, env.Value, Service_NAME, &pod.sInfos) - } else if strings.HasPrefix(env.Name, tagprefix) { - addToServiceInfoMap(env.Name, env.Value, Service_TAGS, &pod.sInfos) - } else if strings.HasPrefix(env.Name, labelprefix) { - addToServiceInfoMap(env.Name, env.Value, Service_LABELS, &pod.sInfos) - } else if strings.HasPrefix(env.Name, mdataprefix) { - addToServiceInfoMap(env.Name, env.Value, Service_MDATA, &pod.sInfos) - } else if strings.HasPrefix(env.Name, nsprefix) { - addToServiceInfoMap(env.Name, env.Value, Service_NS, &pod.sInfos) - } else if strings.HasPrefix(env.Name, ttlprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_TTL, &pod.sInfos) - } else if strings.HasPrefix(env.Name, httpprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_HTTP, &pod.sInfos) - } else if strings.HasPrefix(env.Name, tcpprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_TCP, &pod.sInfos) - } else if strings.HasPrefix(env.Name, cmdprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_CMD, &pod.sInfos) - } else if strings.HasPrefix(env.Name, scriptprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_SCRIPT, &pod.sInfos) - } else if strings.HasPrefix(env.Name, timeoutprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_TIMEOUT, &pod.sInfos) - } else if strings.HasPrefix(env.Name, intervalprefix) && env.Value != "" { - addToServiceInfoMap(env.Name, env.Value, Service_INTERVAL, &pod.sInfos) - } - } -} - -func (pod *PodService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, cport *kapi.ContainerPort, cId string) []*consulapi.AgentServiceRegistration { - asrs := []*consulapi.AgentServiceRegistration{} - checks := []*consulapi.AgentServiceCheck{} - - //build Checks object and populate checks table - var checks_in_tag string - - if len(sInfo.TTL) > 0 { - check := new(consulapi.AgentServiceCheck) - check.TTL = sInfo.TTL - kv := "ttl:" + sInfo.TTL + "," - checks_in_tag += kv - checks = append(checks, check) - } - - if len(sInfo.HTTP) > 0 { - check := new(consulapi.AgentServiceCheck) - check.HTTP = fmt.Sprintf("http://%s:%d%s", config.IPAddress, cport.ContainerPort, sInfo.HTTP) - kv := "http:" + sInfo.HTTP + "," - checks_in_tag += kv - if len(sInfo.Interval) == 0 { - check.Interval = DefaultInterval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + DefaultInterval + "," - checks_in_tag += kv - } - } else { - check.Interval = sInfo.Interval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + sInfo.Interval + "," - checks_in_tag += kv - } - } - if len(sInfo.Timeout) > 0 { - check.Timeout = sInfo.Timeout - if strings.Contains(checks_in_tag, "timeout:") { - //do nothing - } else { - kv = "timeout:" + sInfo.Timeout + "," - checks_in_tag += kv - } - } - checks = append(checks, check) - } - - if len(sInfo.TCP) > 0 { - check := new(consulapi.AgentServiceCheck) - check.TCP = fmt.Sprintf("%s:%d", config.IPAddress, cport.ContainerPort) - kv := "tcp:ok," - checks_in_tag += kv - if len(sInfo.Interval) == 0 { - check.Interval = DefaultInterval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + DefaultInterval + "," - checks_in_tag += kv - } - } else { - check.Interval = sInfo.Interval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + sInfo.Interval + "," - checks_in_tag += kv - } - } - if len(sInfo.Timeout) > 0 { - check.Timeout = sInfo.Timeout - if strings.Contains(checks_in_tag, "timeout:") { - //do nothing - } else { - kv = "timeout:" + sInfo.Timeout + "," - checks_in_tag += kv - } - } - checks = append(checks, check) - } - - if len(sInfo.CMD) > 0 { - check := new(consulapi.AgentServiceCheck) - check.Script = fmt.Sprintf("check-cmd %s %s %s", cId, strconv.Itoa(int(cport.ContainerPort)), sInfo.CMD) - kv := "script:" + sInfo.CMD + "," - checks_in_tag += kv - if len(sInfo.Interval) == 0 { - check.Interval = DefaultInterval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + DefaultInterval + "," - checks_in_tag += kv - } - } else { - check.Interval = sInfo.Interval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + sInfo.Interval + "," - checks_in_tag += kv - } - } - checks = append(checks, check) - } - - if len(sInfo.Script) > 0 { - check := new(consulapi.AgentServiceCheck) - withIp := strings.Replace(sInfo.Script, "$SERVICE_IP", config.IPAddress, -1) - check.Script = strings.Replace(withIp, "$SERVICE_PORT", strconv.Itoa(int(cport.ContainerPort)), -1) - kv := "script:" + sInfo.Script + "," - checks_in_tag += kv - if len(sInfo.Interval) == 0 { - check.Interval = DefaultInterval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + DefaultInterval + "," - checks_in_tag += kv - } - } else { - check.Interval = sInfo.Interval - if strings.Contains(checks_in_tag, "interval:") { - //do nothing - } else { - kv = "interval:" + sInfo.Interval + "," - checks_in_tag += kv - } - } - if len(sInfo.Timeout) > 0 { - check.Timeout = sInfo.Timeout - if strings.Contains(checks_in_tag, "timeout:") { - //do nothing - } else { - kv = "timeout:" + sInfo.Timeout + "," - checks_in_tag += kv - } - } - checks = append(checks, check) - } - - //remove trailing "," - if len(checks_in_tag) != 0 { - checks_in_tag = strings.TrimRight(checks_in_tag, ",") - } - - //build other talbes in tags - var ( - base_in_tag string - lb_in_tag string - labels_in_tag = sInfo.Labels - metadata_in_tag = sInfo.MData - ns_in_tag = sInfo.NS - ) - - ts := strings.Split(sInfo.Tags, ",") - for _, elem := range ts { - if strings.Contains(elem, NETWORK_PLANE_TYPE) || strings.Contains(elem, VISUAL_RANGE) { - kv := "," + elem - labels_in_tag += kv - continue - } - - if strings.Contains(elem, LB_POLICY) || strings.Contains(elem, LB_SVR_PARAMS) { - kv := "," + elem - lb_in_tag += kv - continue - } - - kv := "," + elem - base_in_tag += kv - } - - //remove leading "," - if len(base_in_tag) != 0 { - base_in_tag = strings.TrimLeft(base_in_tag, ",") - } - - if len(lb_in_tag) != 0 { - lb_in_tag = strings.TrimLeft(lb_in_tag, ",") - } - - if len(labels_in_tag) != 0 { - labels_in_tag = strings.TrimLeft(labels_in_tag, ",") - } - - //build tables in tag - var tagslice []string - if len(base_in_tag) != 0 { - tagslice = append(tagslice, buildTableInJsonFormat(Table_BASE, base_in_tag)) - } - - if len(lb_in_tag) != 0 { - tagslice = append(tagslice, buildTableInJsonFormat(Table_LB, lb_in_tag)) - } - - if len(labels_in_tag) == 0 { - tagslice = append(tagslice, DefaultLabels) - } else { - if !strings.Contains(labels_in_tag, "visualRange") { - labels_in_tag += ",visualRange:1" - } - tagslice = append(tagslice, buildTableInJsonFormat(Table_LABELS, labels_in_tag)) - } - - if len(metadata_in_tag) != 0 { - tagslice = append(tagslice, buildTableInJsonFormat(Table_META_DATA, metadata_in_tag)) - } - - if len(ns_in_tag) != 0 { - tagslice = append(tagslice, "\"ns\":{\"namespace\":\"" + ns_in_tag + "\"}") - } - - if len(checks_in_tag) != 0 { - tagslice = append(tagslice, buildTableInJsonFormat(Table_CHECKS, checks_in_tag)) - } - - //append "-" - name := sInfo.Name - if len(ns_in_tag) != 0 { - name = name + "-" + ns_in_tag - } - - //handle IUI service -// for _, elem := range ts { -// var elemslice []string -// elemslice = strings.Split(elem, ":") -// if elemslice[0] == PROTOCOL { -// switch elemslice[1] { -// case PROTOCOL_UI: -// name = PREFIX_UI + name -// default: -// log.Println("regular protocol:", elemslice[1]) -// } -// break -// } -// } - - if len(sInfo.IPs) == 0 { - serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name - asr := &consulapi.AgentServiceRegistration{ - ID: serviceID, - Name: name, - Address: config.IPAddress, - Port: int(cport.ContainerPort), - Tags: tagslice, - Checks: checks, - } - asrs = append(asrs, asr) - return asrs - } else { - for _, ip := range sInfo.IPs { - serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name + "-" + ip.NetworkPlane - addr := ip.IPAddress - tagslice_with_single_network_plane_type := refillTagsliceWithSingleNetworkPlaneType(tagslice, labels_in_tag, ip.NetworkPlane) - asr := &consulapi.AgentServiceRegistration{ - ID: serviceID, - Name: name, - Address: addr, - Port: int(cport.ContainerPort), - Tags: tagslice_with_single_network_plane_type, - Checks: checks, - } - asrs = append(asrs, asr) - } - return asrs - } -} - -func addToServiceInfoMap(name, value, flag string, smap *map[string]*ServiceInfo) { - segs := strings.Split(name, "_") - switch flag { - case Service_NAME: - if strings.ContainsAny(value, ".") { - log.Printf("Warning:service name %s contains dot", value) - value = strings.Replace(value, ".", "-", -1) - log.Printf("Warning:service name has been modified to %s", value) - } - if len(segs) == 3 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Name = value - } else { - sInfo := &ServiceInfo{ - Name: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 4 { - id := segs[3] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Name = value - } else { - sInfo := &ServiceInfo{ - Name: value, - } - (*smap)[id] = sInfo - } - } - case Service_TAGS: - if len(segs) == 3 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Tags = value - } else { - sInfo := &ServiceInfo{ - Tags: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 4 { - id := segs[3] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Tags = value - } else { - sInfo := &ServiceInfo{ - Tags: value, - } - (*smap)[id] = sInfo - } - } - case Service_LABELS: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Labels = value - } else { - sInfo := &ServiceInfo{ - Labels: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Labels = value - } else { - sInfo := &ServiceInfo{ - Labels: value, - } - (*smap)[id] = sInfo - } - } - case Service_MDATA: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].MData = value - } else { - sInfo := &ServiceInfo{ - MData: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].MData = value - } else { - sInfo := &ServiceInfo{ - MData: value, - } - (*smap)[id] = sInfo - } - } - case Service_NS: - if len(segs) == 3 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].NS = value - } else { - sInfo := &ServiceInfo{ - NS: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 4 { - id := segs[3] - if _, ok := (*smap)[id]; ok { - (*smap)[id].NS = value - } else { - sInfo := &ServiceInfo{ - NS: value, - } - (*smap)[id] = sInfo - } - } - case Service_TTL: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].TTL = value - } else { - sInfo := &ServiceInfo{ - TTL: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].TTL = value - } else { - sInfo := &ServiceInfo{ - TTL: value, - } - (*smap)[id] = sInfo - } - } - case Service_HTTP: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].HTTP = value - } else { - sInfo := &ServiceInfo{ - HTTP: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].HTTP = value - } else { - sInfo := &ServiceInfo{ - HTTP: value, - } - (*smap)[id] = sInfo - } - } - case Service_TCP: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].TCP = value - } else { - sInfo := &ServiceInfo{ - TCP: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].TCP = value - } else { - sInfo := &ServiceInfo{ - TCP: value, - } - (*smap)[id] = sInfo - } - } - case Service_CMD: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].CMD = value - } else { - sInfo := &ServiceInfo{ - CMD: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].CMD = value - } else { - sInfo := &ServiceInfo{ - CMD: value, - } - (*smap)[id] = sInfo - } - } - case Service_SCRIPT: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Script = value - } else { - sInfo := &ServiceInfo{ - Script: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Script = value - } else { - sInfo := &ServiceInfo{ - Script: value, - } - (*smap)[id] = sInfo - } - } - case Service_TIMEOUT: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Timeout = value - } else { - sInfo := &ServiceInfo{ - Timeout: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Timeout = value - } else { - sInfo := &ServiceInfo{ - Timeout: value, - } - (*smap)[id] = sInfo - } - } - case Service_INTERVAL: - if len(segs) == 4 { - if _, ok := (*smap)["0"]; ok { - (*smap)["0"].Interval = value - } else { - sInfo := &ServiceInfo{ - Interval: value, - } - (*smap)["0"] = sInfo - } - } else if len(segs) == 5 { - id := segs[4] - if _, ok := (*smap)[id]; ok { - (*smap)[id].Interval = value - } else { - sInfo := &ServiceInfo{ - Interval: value, - } - (*smap)[id] = sInfo - } - } - default: - log.Println("Unsupported Service Attribute: ", flag) - } -} - -func buildTableInJsonFormat(table, input string) string { - head := "\"" + table + "\":{" - tail := "}" - body := "" - - s := strings.Split(input, ",") - slen := len(s) - for _, v := range s { - slen-- - s1 := strings.Split(v, ":") - mstr := "\"" + strings.TrimSpace(s1[0]) + "\":" - if strings.Contains(body, mstr) { - if slen == 0 { - body = strings.TrimRight(body, ",") - } - continue - } - if len(s1) == 2 { - kv := "\"" + strings.TrimSpace(s1[0]) + "\":\"" + strings.TrimSpace(s1[1]) + "\"" - if slen != 0 { - kv += "," - } - body += kv - } - } - - return head + body + tail -} - -func refillTagsliceWithSingleNetworkPlaneType(old []string, labels, nw_type string) []string { - var ret_tagslice []string - var new_labels string - - for _, elem := range old { - if strings.Contains(elem, NETWORK_PLANE_TYPE) { - continue - } - ret_tagslice = append(ret_tagslice, elem) - } - - sl := strings.Split(labels, NETWORK_PLANE_TYPE) - sl1 := strings.SplitN(sl[1], ",", 2) - nw_type_field := NETWORK_PLANE_TYPE + ":" + nw_type - - if len(sl1) == 2 { - new_labels = sl[0] + nw_type_field + "," + sl1[1] - } else { - new_labels = sl[0] + nw_type_field - } - ret_tagslice = append(ret_tagslice, buildTableInJsonFormat(Table_LABELS, new_labels)) - - return ret_tagslice -} \ No newline at end of file diff --git a/kube2consul/src/kube2consul/types.go b/kube2consul/src/kube2consul/types.go deleted file mode 100644 index 0105d98..0000000 --- a/kube2consul/src/kube2consul/types.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Copyright 2017 ZTE, Inc. and others. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// types.go -package main - -import ( - kapi "k8s.io/kubernetes/pkg/api" -) - -type KubeWorkAction string - -const ( - KubeWorkAddNode KubeWorkAction = "AddNode" - KubeWorkRemoveNode KubeWorkAction = "RemoveNode" - KubeWorkUpdateNode KubeWorkAction = "UpdateNode" - KubeWorkAddService KubeWorkAction = "AddService" - KubeWorkRemoveService KubeWorkAction = "RemoveService" - KubeWorkUpdateService KubeWorkAction = "UpdateService" - KubeWorkAddPod KubeWorkAction = "AddPod" - KubeWorkRemovePod KubeWorkAction = "RemovePod" - KubeWorkUpdatePod KubeWorkAction = "UpdatePod" - KubeWorkSync KubeWorkAction = "Sync" -) - -type KubeWork struct { - Action KubeWorkAction - Node *kapi.Node - Service *kapi.Service - Pod *kapi.Pod -} - -type ConsulWorkAction string - -const ( - ConsulWorkAddService ConsulWorkAction = "AddService" - ConsulWorkRemoveService ConsulWorkAction = "RemoveService" - ConsulWorkAddPod ConsulWorkAction = "AddPod" - ConsulWorkRemovePod ConsulWorkAction = "RemovePod" - ConsulWorkSyncDNS ConsulWorkAction = "SyncDNS" -) - -type ConsulWork struct { - Action ConsulWorkAction - Service *kapi.Service - Pod *kapi.Pod - Config DnsInfo -} - -type DnsInfo struct { - BaseID string - IPAddress string - ServiceType kapi.ServiceType -} - -type ServiceInfo struct { - Name string - Tags string - Labels string - MData string - NS string - TTL string - HTTP string - TCP string - CMD string - Script string - Timeout string - Interval string - IPs []PodIP -} - -type PdmPodIPInfo struct { - Pod PodIPInfo `json:"pod"` -} - -type PodIPInfo struct { - Name string `json:"name"` - IPs []PodIP `json:"ips"` -} - -type PodIP struct { - NetworkPlane string `json:"network_plane_name"` - IPAddress string `json:"ip_address"` -} diff --git a/kube2consul/src/kube2consul/util/restclient/restclient.go b/kube2consul/src/kube2consul/util/restclient/restclient.go deleted file mode 100644 index e01ae0f..0000000 --- a/kube2consul/src/kube2consul/util/restclient/restclient.go +++ /dev/null @@ -1,42 +0,0 @@ -package restclient - -import ( - "fmt" - "io/ioutil" - "net/http" -) - -type RESTClient struct { - base string - resource string - param string -} - -func NewRESTClient(baseURL string, versionedAPIPath string, urlParameter string) *RESTClient { - return &RESTClient{ - base: baseURL, - resource: versionedAPIPath, - param: urlParameter, - } -} - -func (c *RESTClient) Get() (b []byte, err error) { - url := c.base + "/" + c.resource + "/" + c.param - res, err := http.Get(url) - if err != nil { - return nil, err - } - - if res.StatusCode != 200 { - return nil, fmt.Errorf(res.Status) - } - - buf, err := ioutil.ReadAll(res.Body) - res.Body.Close() - - if err != nil { - return nil, err - } - - return buf, nil -} diff --git a/kube2consul/src/main/blueprint/deploy.yml b/kube2consul/src/main/blueprint/deploy.yml deleted file mode 100644 index cc61076..0000000 --- a/kube2consul/src/main/blueprint/deploy.yml +++ /dev/null @@ -1,8 +0,0 @@ ---- -- remote_user: ubuntu - become: yes - become_method: sudo - vars_files: - - vars.yml - tasks: - - include: task.yml diff --git a/kube2consul/src/main/blueprint/list_of_servicelet.list b/kube2consul/src/main/blueprint/list_of_servicelet.list deleted file mode 100644 index 77a1d4f..0000000 --- a/kube2consul/src/main/blueprint/list_of_servicelet.list +++ /dev/null @@ -1,4 +0,0 @@ -{ - "servicelet_module":[ - ] -} \ No newline at end of file diff --git a/kube2consul/src/main/blueprint/task.yml b/kube2consul/src/main/blueprint/task.yml deleted file mode 100644 index 45cfcd9..0000000 --- a/kube2consul/src/main/blueprint/task.yml +++ /dev/null @@ -1,118 +0,0 @@ ---- -- name: remove kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - state: absent - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - net: host - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - when: - - all_in_one == 'no' - - master_in_controller == 'no' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - net: host - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - CLUSTER_TYPE: "openshift" - when: - - all_in_one == 'no' - - master_in_controller == 'no' - - cluster_type == 'openshift' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - when: - - all_in_one == 'yes' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - CLUSTER_TYPE: "openshift" - when: - - all_in_one == 'yes' - - cluster_type == 'openshift' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - when: - - master_in_controller == 'yes' - - cluster_type == 'k8s' - -- name: run kube2consul container - docker: - name: kube2consul - image: "{{kube2consul_image}}" - log_driver: syslog - restart_policy: always - privileged: true - volumes: - - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" - - "/root/.kube/config:/root/.kube/config:ro" - env: - KUBE_MASTER_IP: "{{kube_master_ip}}" - PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" - JOIN_IP: "{{consul_join_ip}}" - ALL_IN_ONE: "yes" - CLUSTER_TYPE: "openshift" - when: - - master_in_controller == 'yes' - - cluster_type == 'openshift' \ No newline at end of file diff --git a/kube2consul/src/main/blueprint/vars.yml b/kube2consul/src/main/blueprint/vars.yml deleted file mode 100644 index 971438d..0000000 --- a/kube2consul/src/main/blueprint/vars.yml +++ /dev/null @@ -1,14 +0,0 @@ ---- -- api_network_ip: -- man_network_ip: -- registry_url: -- cp_vertype: -- cp_type: -- cp_name: -- cp_version: -- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}" -- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}" -- pdm_controller_ip: "{{vp_ip}}" -- consul_join_ip: "{{zenap_msb_consul_server_ip}}" -- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}" -- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file diff --git a/kube2consul/src/main/docker/Dockerfile b/kube2consul/src/main/docker/Dockerfile deleted file mode 100644 index 278cac5..0000000 --- a/kube2consul/src/main/docker/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM alpine:3.3 -ENV CONSUL_VERSION 0.7.1 -ENV BASE / -ADD consul-linux_amd64.tar.gz / -RUN cd /usr/lib \ - && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \ - && ln -s /consul/libintl.so.8.1.3 libintl.so.8 -COPY kube2consul /bin/ -COPY start.sh / - -ENTRYPOINT exec /start.sh \ No newline at end of file diff --git a/kube2consul/src/main/docker/start.sh b/kube2consul/src/main/docker/start.sh deleted file mode 100644 index 033c50b..0000000 --- a/kube2consul/src/main/docker/start.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/sh -if [ -z "${KUBE_MASTER_IP}" ]; then - echo "kube master node ip is required." - exit 1 -fi - -if [ -n "${JOIN_IP}" ]; then - echo "### Starting consul client" - if [ -z "${ALL_IN_ONE}" ]; then - /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & - else - /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & - fi -fi - -if [ -z "${RUN_MODE}" ]; then - echo "non-HA scenario." -else - echo "\n\n### Starting consul agent" - cd ./consul - ./entry.sh & -fi - -kube_url="http://${KUBE_MASTER_IP}:8080" - -if [ "${CLUSTER_TYPE}" == "openshift" ]; then - kube_url="https://${KUBE_MASTER_IP}:8443" -fi - -echo "\n\n### Starting kube2consul" -if [ -z "${PDM_CONTROLLER_IP}" ]; then - /bin/kube2consul --kube_master_url ${kube_url} -else - echo "in Paas mode." - /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527 -fi \ No newline at end of file diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 75bbfd4..0000000 --- a/pom.xml +++ /dev/null @@ -1,159 +0,0 @@ - - - - 4.0.0 - - - org.onap.oparent - oparent - 1.0.0-SNAPSHOT - - - org.onap.oom.registrator - oom-registrator-parent - 1.0.0-SNAPSHOT - pom - onap/oom/registrator-parent - - - - - - UTF-8 - UTF-8 - - 1.8 - 0.8.4 - - windows_386 - windows_amd64 - linux_amd64 - - target/assembly/${classifier.linux64}/ - target/assembly/${classifier.win32}/ - target/assembly/${classifier.win64}/ - - src/main/docker - src/main/blueprint - bin - - - - - kube2consul - - - - - - - false - src/main/resources - - **/* - - - - true - src/main/filters - - **/* - - - - - - - - - maven-antrun-plugin - 1.8 - - - - maven-clean-plugin - 2.6.1 - - - maven-install-plugin - 2.5.2 - - - maven-deploy-plugin - 2.8.2 - - - - maven-resources-plugin - 2.7 - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - - com.igormaznitsa - mvn-golang-wrapper - 2.1.0 - true - - true - ${go.sdk.version} - true - - ${basedir} - 0 - - ${basedir}/src/${project.artifactId} - amd64 - linux - - - - default-build - - build - - - ${app.exec.name} - - - - default-mvninstall - - mvninstall - - - - - - - - - - - - - - -- cgit 1.2.3-korg