From 3736aafdb168d76483a42acb552098244ceee034 Mon Sep 17 00:00:00 2001 From: HuabingZhao Date: Fri, 11 Aug 2017 09:16:22 +0000 Subject: Initial commit for registrator source codes Change-Id: I0cdd285d6228f0a1b6a6b27787be09d2d7af3579 Issue-Id: OOM-61 Signed-off-by: HuabingZhao --- kube2consul/bin/Dockerfile | 11 + kube2consul/bin/blueprint/deploy.yml | 8 + kube2consul/bin/blueprint/list_of_servicelet.list | 4 + kube2consul/bin/blueprint/task.yml | 118 ++++ kube2consul/bin/blueprint/vars.yml | 14 + kube2consul/bin/start.sh | 36 ++ kube2consul/pom.xml | 109 ++++ kube2consul/src/kube2consul/Makefile | 11 + kube2consul/src/kube2consul/consul_work.go | 366 +++++++++++ kube2consul/src/kube2consul/kube2consul.go | 449 ++++++++++++++ kube2consul/src/kube2consul/kube_service.go | 100 +++ kube2consul/src/kube2consul/kube_work.go | 511 ++++++++++++++++ kube2consul/src/kube2consul/pod_service.go | 677 +++++++++++++++++++++ kube2consul/src/kube2consul/types.go | 96 +++ .../src/kube2consul/util/restclient/restclient.go | 42 ++ kube2consul/src/main/blueprint/deploy.yml | 8 + .../src/main/blueprint/list_of_servicelet.list | 4 + kube2consul/src/main/blueprint/task.yml | 118 ++++ kube2consul/src/main/blueprint/vars.yml | 14 + kube2consul/src/main/docker/Dockerfile | 11 + kube2consul/src/main/docker/start.sh | 36 ++ 21 files changed, 2743 insertions(+) create mode 100644 kube2consul/bin/Dockerfile create mode 100644 kube2consul/bin/blueprint/deploy.yml create mode 100644 kube2consul/bin/blueprint/list_of_servicelet.list create mode 100644 kube2consul/bin/blueprint/task.yml create mode 100644 kube2consul/bin/blueprint/vars.yml create mode 100644 kube2consul/bin/start.sh create mode 100644 kube2consul/pom.xml create mode 100644 kube2consul/src/kube2consul/Makefile create mode 100644 kube2consul/src/kube2consul/consul_work.go create mode 100644 kube2consul/src/kube2consul/kube2consul.go create mode 100644 kube2consul/src/kube2consul/kube_service.go create mode 100644 kube2consul/src/kube2consul/kube_work.go create mode 100644 kube2consul/src/kube2consul/pod_service.go create mode 100644 kube2consul/src/kube2consul/types.go create mode 100644 kube2consul/src/kube2consul/util/restclient/restclient.go create mode 100644 kube2consul/src/main/blueprint/deploy.yml create mode 100644 kube2consul/src/main/blueprint/list_of_servicelet.list create mode 100644 kube2consul/src/main/blueprint/task.yml create mode 100644 kube2consul/src/main/blueprint/vars.yml create mode 100644 kube2consul/src/main/docker/Dockerfile create mode 100644 kube2consul/src/main/docker/start.sh (limited to 'kube2consul') diff --git a/kube2consul/bin/Dockerfile b/kube2consul/bin/Dockerfile new file mode 100644 index 0000000..278cac5 --- /dev/null +++ b/kube2consul/bin/Dockerfile @@ -0,0 +1,11 @@ +FROM alpine:3.3 +ENV CONSUL_VERSION 0.7.1 +ENV BASE / +ADD consul-linux_amd64.tar.gz / +RUN cd /usr/lib \ + && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \ + && ln -s /consul/libintl.so.8.1.3 libintl.so.8 +COPY kube2consul /bin/ +COPY start.sh / + +ENTRYPOINT exec /start.sh \ No newline at end of file diff --git a/kube2consul/bin/blueprint/deploy.yml b/kube2consul/bin/blueprint/deploy.yml new file mode 100644 index 0000000..cc61076 --- /dev/null +++ b/kube2consul/bin/blueprint/deploy.yml @@ -0,0 +1,8 @@ +--- +- remote_user: ubuntu + become: yes + become_method: sudo + vars_files: + - vars.yml + tasks: + - include: task.yml diff --git a/kube2consul/bin/blueprint/list_of_servicelet.list b/kube2consul/bin/blueprint/list_of_servicelet.list new file mode 100644 index 0000000..77a1d4f --- /dev/null +++ b/kube2consul/bin/blueprint/list_of_servicelet.list @@ -0,0 +1,4 @@ +{ + "servicelet_module":[ + ] +} \ No newline at end of file diff --git a/kube2consul/bin/blueprint/task.yml b/kube2consul/bin/blueprint/task.yml new file mode 100644 index 0000000..45cfcd9 --- /dev/null +++ b/kube2consul/bin/blueprint/task.yml @@ -0,0 +1,118 @@ +--- +- name: remove kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + state: absent + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + net: host + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + when: + - all_in_one == 'no' + - master_in_controller == 'no' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + net: host + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + CLUSTER_TYPE: "openshift" + when: + - all_in_one == 'no' + - master_in_controller == 'no' + - cluster_type == 'openshift' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + when: + - all_in_one == 'yes' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + CLUSTER_TYPE: "openshift" + when: + - all_in_one == 'yes' + - cluster_type == 'openshift' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + when: + - master_in_controller == 'yes' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + CLUSTER_TYPE: "openshift" + when: + - master_in_controller == 'yes' + - cluster_type == 'openshift' \ No newline at end of file diff --git a/kube2consul/bin/blueprint/vars.yml b/kube2consul/bin/blueprint/vars.yml new file mode 100644 index 0000000..971438d --- /dev/null +++ b/kube2consul/bin/blueprint/vars.yml @@ -0,0 +1,14 @@ +--- +- api_network_ip: +- man_network_ip: +- registry_url: +- cp_vertype: +- cp_type: +- cp_name: +- cp_version: +- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}" +- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}" +- pdm_controller_ip: "{{vp_ip}}" +- consul_join_ip: "{{zenap_msb_consul_server_ip}}" +- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}" +- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file diff --git a/kube2consul/bin/start.sh b/kube2consul/bin/start.sh new file mode 100644 index 0000000..033c50b --- /dev/null +++ b/kube2consul/bin/start.sh @@ -0,0 +1,36 @@ +#!/bin/sh +if [ -z "${KUBE_MASTER_IP}" ]; then + echo "kube master node ip is required." + exit 1 +fi + +if [ -n "${JOIN_IP}" ]; then + echo "### Starting consul client" + if [ -z "${ALL_IN_ONE}" ]; then + /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & + else + /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & + fi +fi + +if [ -z "${RUN_MODE}" ]; then + echo "non-HA scenario." +else + echo "\n\n### Starting consul agent" + cd ./consul + ./entry.sh & +fi + +kube_url="http://${KUBE_MASTER_IP}:8080" + +if [ "${CLUSTER_TYPE}" == "openshift" ]; then + kube_url="https://${KUBE_MASTER_IP}:8443" +fi + +echo "\n\n### Starting kube2consul" +if [ -z "${PDM_CONTROLLER_IP}" ]; then + /bin/kube2consul --kube_master_url ${kube_url} +else + echo "in Paas mode." + /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527 +fi \ No newline at end of file diff --git a/kube2consul/pom.xml b/kube2consul/pom.xml new file mode 100644 index 0000000..3ab8ce7 --- /dev/null +++ b/kube2consul/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + + + org.onap.oom.registrator + oom-registrator-parent + 1.0.0-SNAPSHOT + + + + org.onap.oom.registrator + kube2consul + 1.0.0-SNAPSHOT + pom + + + kube2consul + + + + ${basedir}${file.separator}src + ${basedir}${file.separator}bin + + + com.igormaznitsa + mvn-golang-wrapper + + + maven-resources-plugin + + + copy-resources-dockerfile + process-resources + + copy-resources + + + ${version.output} + true + + + ${dockerFileDir} + false + + **/* + + + + true + + + + copy-resources-blueprint + process-resources + + copy-resources + + + ${version.output}/blueprint + true + + + ${blueprintFileDir} + false + + **/* + + + + true + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copytolinux64 + + copy + + prepare-package + + + + com.zte.ums.zenap.msb.components + consul + tar.gz + linux_amd64 + + + ${version.output} + false + true + true + true + + + + + + + + \ No newline at end of file diff --git a/kube2consul/src/kube2consul/Makefile b/kube2consul/src/kube2consul/Makefile new file mode 100644 index 0000000..141abb0 --- /dev/null +++ b/kube2consul/src/kube2consul/Makefile @@ -0,0 +1,11 @@ +.PHONY: kube2consul clean test + +kube2consul: kube2consul.go + CGO_ENABLED=0 go build --ldflags '-extldflags "-static"' + strip kube2consul + +clean: + rm -fr kube2consul + +test: clean + go test -v --vmodule=*=4 diff --git a/kube2consul/src/kube2consul/consul_work.go b/kube2consul/src/kube2consul/consul_work.go new file mode 100644 index 0000000..2e5927a --- /dev/null +++ b/kube2consul/src/kube2consul/consul_work.go @@ -0,0 +1,366 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// consul_work.go +package main + +import ( + "log" + "strings" + "sync" + + consulapi "github.com/hashicorp/consul/api" + kapi "k8s.io/kubernetes/pkg/api" +) + +type ConsulWorker interface { + AddService(config DnsInfo, service *kapi.Service) + RemoveService(config DnsInfo) + AddPod(config DnsInfo, pod *kapi.Pod) + RemovePod(config DnsInfo) + SyncDNS() +} + +type ConsulAgentWorker struct { + sync.Mutex + ConsulWorker + ids map[string][]*consulapi.AgentServiceRegistration + agent *consulapi.Client +} + +func newConsulAgentWorker(client *consulapi.Client) *ConsulAgentWorker { + return &ConsulAgentWorker{ + agent: client, + ids: make(map[string][]*consulapi.AgentServiceRegistration), + } +} + +func (client *ConsulAgentWorker) AddService(config DnsInfo, service *kapi.Service) { + client.Lock() + defer client.Unlock() + log.Println("Starting Add Service for: ", config.BaseID) + + if config.IPAddress == "" || config.BaseID == "" { + log.Println("Service Info is not valid for AddService") + return + } + + annos := service.ObjectMeta.Annotations + //only register service with annotations + if len(annos) == 0 { + log.Println("no Annotation defined for this service:", service.Name) + return + } + + for _, port := range service.Spec.Ports { + kubeService := newKubeService() + kubeService.BuildServiceInfoMap(&port, &annos) + + if len(kubeService.sInfos) == 0 { + log.Println("no Service defined for this port:", port.Port) + continue + } + + //remove service with no name + for id, s := range kubeService.sInfos { + if len(s.Name) == 0 { + if id == "0" { + log.Printf("SERVICE_%d_NAME not set, ignore this service.", port.Port) + } else { + log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", port.Port, id) + } + delete(kubeService.sInfos, id) + } + } + + //test again + if len(kubeService.sInfos) == 0 { + log.Println("no Service defined for this port:", port.Port) + continue + } + + for _, s := range kubeService.sInfos { + asr := kubeService.BuildAgentService(s, config, &port) + if *argChecks && port.Protocol == "TCP" && config.ServiceType != kapi.ServiceTypeClusterIP { + //Create Check if neeeded + asr.Check = kubeService.BuildAgentServiceCheck(config, &port) + } + + if client.agent != nil { + //Registers with DNS + if err := client.agent.Agent().ServiceRegister(asr); err != nil { + log.Println("Error creating service record: ", asr.ID) + } + } + + //Add to IDS + client.ids[config.BaseID] = append(client.ids[config.BaseID], asr) + } + } + log.Println("Added Service: ", service.Name) +} + +func (client *ConsulAgentWorker) RemoveService(config DnsInfo) { + client.Lock() + defer client.Unlock() + if ids, ok := client.ids[config.BaseID]; ok { + for _, asr := range ids { + if client.agent != nil { + if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil { + log.Println("Error removing service: ", err) + } + } + } + delete(client.ids, config.BaseID) + } else { + log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID) + } +} + +func (client *ConsulAgentWorker) AddPod(config DnsInfo, pod *kapi.Pod) { + client.Lock() + defer client.Unlock() + log.Println("Starting Add Pod for: ", config.BaseID) + + //double check if have Pod IPs so far in Paas mode + if *argPdmControllerUrl != "" { + if _, ok := pdmPodIPsMap[pod.Name]; !ok { + log.Println("In Paas mode, We don't have Pod IPs to register with for Pod: ", pod.Name) + return + } + } + + containerIdMap := make(map[string]string) + buildContainerIdMap(pod, &containerIdMap) + + for _, c := range pod.Spec.Containers { + + for _, p := range c.Ports { + podService := newPodService() + podService.BuildServiceInfoMap(&p, &c) + if len(podService.sInfos) == 0 { + log.Println("no Service defined for this port:", p.ContainerPort) + continue + } + + //remove service with no name + for id, s := range podService.sInfos { + if len(s.Name) == 0 { + if id == "0" { + log.Printf("SERVICE_%d_NAME not set, ignore this service.", p.ContainerPort) + } else { + log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", p.ContainerPort, id) + } + delete(podService.sInfos, id) + } + } + + //remove service if same service exist with different protocol + for id, s := range podService.sInfos { + services := []*consulapi.CatalogService{} + sn := s.Name + //append namespace if specified + if len(s.NS) != 0 { + sn = sn + "-" + s.NS + } + var tags []string + var protocol string + tags = strings.Split(s.Tags, ",") + for _, v := range tags { + var elems []string + elems = strings.Split(v, ":") + if elems[0] == PROTOCOL { + protocol = elems[1] + break + } + } + //remove service with empty protocol + if protocol == "" { + delete(podService.sInfos, id) + continue + } + + protocol_field := "\"protocol\":\"" + protocol + "\"" + + //query consul + if client.agent != nil { + same_protocol := false + services, _, _ = client.agent.Catalog().Service(sn, "", nil) + if services == nil || len(services) == 0 { + continue + } + for _, v := range services[0].ServiceTags { + if strings.Contains(v, protocol_field) { + same_protocol = true + break + } + } + + if !same_protocol { + log.Printf("same service with different protocol already exist, ignore service:%s, protocol:%s", sn, protocol) + delete(podService.sInfos, id) + } + } + } + + //remove service with no network plane type in tags + if *argPdmControllerUrl != "" { + for id, s := range podService.sInfos { + if len(s.Tags) == 0 { + if id == "0" { + log.Printf("SERVICE_%d_TAGS not set, ignore this service in Paas mode.", p.ContainerPort) + } else { + log.Printf("SERVICE_%d_TAGS_%s not set, ignore this service in Paas mode.", p.ContainerPort, id) + } + delete(podService.sInfos, id) + } else { + var tags []string + tags = strings.Split(s.Tags, ",") + for _, v := range tags { + var elems []string + elems = strings.Split(v, ":") + //this is saved in Tags, later will be moved to Labels?? + if elems[0] == NETWORK_PLANE_TYPE { + types := strings.Split(elems[1], "|") + for _, t := range types { + ip := getIPFromPodIPsMap(t, pod.Name) + if ip == "" { + log.Printf("User defined network type:%s has no assigned ip for Pod:%s", t, pod.Name) + } else { + log.Printf("Found ip:%s for network type:%s", ip, t) + s.IPs = append(s.IPs, PodIP{ + NetworkPlane: t, + IPAddress: ip, + }) + } + } + } + } + + if len(s.IPs) == 0 { + log.Printf("In Paas mode, no IP assigned for Pod:ContainerPort->%s:%d", pod.Name, p.ContainerPort) + delete(podService.sInfos, id) + } + } + } + } + + //test again + if len(podService.sInfos) == 0 { + log.Println("no Service defined for this port:", p.ContainerPort) + continue + } + + for _, s := range podService.sInfos { + asrs := podService.BuildAgentService(s, config, &p, containerIdMap[c.Name]) + if client.agent != nil { + for _, asr := range asrs { + //Registers with DNS + if err := client.agent.Agent().ServiceRegister(asr); err != nil { + log.Println("Error creating service record: ", asr.ID) + } else { + log.Printf("Added service:instance ->%s:%s", asr.Name, asr.ID) + } + client.ids[config.BaseID] = append(client.ids[config.BaseID], asr) + } + } else { + log.Println("Consul client is not available.") + } + } + } + } + log.Println("Added Pod: ", pod.Name) +} + +func (client *ConsulAgentWorker) RemovePod(config DnsInfo) { + client.Lock() + defer client.Unlock() + if ids, ok := client.ids[config.BaseID]; ok { + for _, asr := range ids { + if client.agent != nil { + if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil { + log.Println("Error removing service: ", err) + } else { + log.Printf("removed service -> %s:%d", asr.Name, asr.Port) + } + } + } + delete(client.ids, config.BaseID) + log.Println("Removed Pod: ", config.BaseID) + } else { + log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID) + } +} + +func (client *ConsulAgentWorker) SyncDNS() { + if client.agent != nil { + if services, err := client.agent.Agent().Services(); err == nil { + for _, registered := range client.ids { + for _, service := range registered { + if !containsServiceId(service.ID, services) { + log.Println("Regregistering missing service ID: ", service.ID) + client.agent.Agent().ServiceRegister(service) + } + } + } + for _, service := range services { + found := false + for _, registered := range client.ids { + for _, svc := range registered { + if service.ID == svc.ID { + found = true + break + } + } + if found { + break + } + } + if !found { + log.Println("Degregistering dead service ID: ", service.ID) + client.agent.Agent().ServiceDeregister(service.ID) + } + } + } else { + log.Println("Error retreiving services from consul during sync: ", err) + } + } +} + +func buildContainerIdMap(pod *kapi.Pod, cmap *map[string]string) { + for _, s := range pod.Status.ContainerStatuses { + id := strings.TrimLeft(s.ContainerID, "docker://") + (*cmap)[s.Name] = id + } +} + +func containsServiceId(id string, services map[string]*consulapi.AgentService) bool { + for _, service := range services { + if service.ID == id { + return true + } + } + return false +} + +func getIPFromPodIPsMap(ptype string, podname string) string { + ips := pdmPodIPsMap[podname] + for _, v := range ips { + if v.NetworkPlane == ptype { + return v.IPAddress + } + } + return "" +} diff --git a/kube2consul/src/kube2consul/kube2consul.go b/kube2consul/src/kube2consul/kube2consul.go new file mode 100644 index 0000000..f1213f2 --- /dev/null +++ b/kube2consul/src/kube2consul/kube2consul.go @@ -0,0 +1,449 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// kube2consul.go +package main + +import ( + "flag" + "fmt" + "log" + "net/url" + "os" + "reflect" + "time" + + consulapi "github.com/hashicorp/consul/api" + kapi "k8s.io/kubernetes/pkg/api" + kcache "k8s.io/kubernetes/pkg/client/cache" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + kframework "k8s.io/kubernetes/pkg/controller/framework" + kselector "k8s.io/kubernetes/pkg/fields" + klabels "k8s.io/kubernetes/pkg/labels" +) + +var ( + argConsulAgent = flag.String("consul-agent", "http://127.0.0.1:8500", "URL to consul agent") + argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.") + argChecks = flag.Bool("checks", false, "Adds TCP service checks for each TCP Service") + argPdmControllerUrl = flag.String("pdm_controller_url", "", "URL to pdm controller") + addMap = make(map[string]*kapi.Pod) + deleteMap = make(map[string]*kapi.Pod) + pdmPodIPsMap = make(map[string][]PodIP) + nodeSelector = klabels.Everything() +) + +const ( + // Maximum number of attempts to connect to consul server. + maxConnectAttempts = 12 + // Resync period for the kube controller loop. + resyncPeriod = 5 * time.Second + // Default Health check interval + DefaultInterval = "10s" + // Resource url to query pod from PDM controller + podUrl = "nw/v1/tenants" +) + +const ( + Service_NAME = "NAME" + Service_TAGS = "TAGS" + Service_LABELS = "LABELS" + Service_MDATA = "MDATA" + Service_NS = "NAMESPACE" + Service_TTL = "TTL" + Service_HTTP = "HTTP" + Service_TCP = "TCP" + Service_CMD = "CMD" + Service_SCRIPT = "SCRIPT" + Service_TIMEOUT = "TIMEOUT" + Service_INTERVAL = "INTERVAL" +) + +const ( + Table_BASE = "base" + Table_LB = "lb" + Table_LABELS = "labels" + Table_META_DATA = "metadata" + Table_NS = "ns" + Table_CHECKS = "checks" +) + +const ( + PROTOCOL = "protocol" + PROTOCOL_UI = "UI" + PREFIX_UI = "IUI_" +) + +const ( + //filter out from the tags for compatibility + NETWORK_PLANE_TYPE = "network_plane_type" + VISUAL_RANGE = "visualRange" + LB_POLICY = "lb_policy" + LB_SVR_PARAMS = "lb_server_params" +) + +const DefaultLabels = "\"labels\":{\"visualRange\":\"1\"}" + +func getKubeMasterUrl() (string, error) { + if *argKubeMasterUrl == "" { + return "", fmt.Errorf("no --kube_master_url specified") + } + parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl)) + if err != nil { + return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err) + } + if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" { + return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl) + } + return parsedUrl.String(), nil +} + +func newConsulClient(consulAgent string) (*consulapi.Client, error) { + var ( + client *consulapi.Client + err error + ) + + consulConfig := consulapi.DefaultConfig() + consulAgentUrl, err := url.Parse(consulAgent) + if err != nil { + log.Println("Error parsing Consul url") + return nil, err + } + + if consulAgentUrl.Host != "" { + consulConfig.Address = consulAgentUrl.Host + } + + if consulAgentUrl.Scheme != "" { + consulConfig.Scheme = consulAgentUrl.Scheme + } + + client, err = consulapi.NewClient(consulConfig) + if err != nil { + log.Println("Error creating Consul client") + return nil, err + } + + for attempt := 1; attempt <= maxConnectAttempts; attempt++ { + if _, err = client.Agent().Self(); err == nil { + break + } + + if attempt == maxConnectAttempts { + break + } + + log.Printf("[Attempt: %d] Attempting access to Consul after 5 second sleep", attempt) + time.Sleep(5 * time.Second) + } + + if err != nil { + return nil, fmt.Errorf("failed to connect to Consul agent: %v, error: %v", consulAgent, err) + } + log.Printf("Consul agent found: %v", consulAgent) + + return client, nil +} + +func newKubeClient() (*kclient.Client, error) { + masterUrl, err := getKubeMasterUrl() + if err != nil { + return nil, err + } + overrides := &kclientcmd.ConfigOverrides{} + overrides.ClusterInfo.Server = masterUrl + + rules := kclientcmd.NewDefaultClientConfigLoadingRules() + kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() + + if err != nil { + log.Println("Error creating Kube Config", err) + return nil, err + } + kubeConfig.Host = masterUrl + + log.Printf("Using %s for kubernetes master", kubeConfig.Host) + return kclient.New(kubeConfig) +} + +func createNodeLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "nodes", kapi.NamespaceAll, kselector.Everything()) +} + +func sendNodeWork(action KubeWorkAction, queue chan<- KubeWork, oldObject, newObject interface{}) { + if node, ok := newObject.(*kapi.Node); ok { + if nodeSelector.Matches(klabels.Set(node.Labels)) == false { + log.Printf("Ignoring node %s due to label selectors", node.Name) + return + } + + log.Println("Node Action: ", action, " for node ", node.Name) + work := KubeWork{ + Action: action, + Node: node, + } + + if action == KubeWorkUpdateNode { + if oldNode, ok := oldObject.(*kapi.Node); ok { + if nodeReady(node) != nodeReady(oldNode) { + log.Println("Ready state change. Old:", nodeReady(oldNode), " New: ", nodeReady(node)) + queue <- work + } + } + } else { + queue <- work + } + } +} + +func watchForNodes(kubeClient *kclient.Client, queue chan<- KubeWork) { + _, nodeController := kframework.NewInformer( + createNodeLW(kubeClient), + &kapi.Node{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sendNodeWork(KubeWorkAddNode, queue, nil, obj) + }, + DeleteFunc: func(obj interface{}) { + sendNodeWork(KubeWorkRemoveNode, queue, nil, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + sendNodeWork(KubeWorkUpdateNode, queue, oldObj, newObj) + }, + }, + ) + stop := make(chan struct{}) + go nodeController.Run(stop) +} + +// Returns a cache.ListWatch that gets all changes to services. +func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything()) +} + +func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) { + if service, ok := serviceObj.(*kapi.Service); ok { + log.Println("Service Action: ", action, " for service ", service.Name) + queue <- KubeWork{ + Action: action, + Service: service, + } + } +} + +func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) { + _, svcController := kframework.NewInformer( + createServiceLW(kubeClient), + &kapi.Service{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sendServiceWork(KubeWorkAddService, queue, obj) + }, + DeleteFunc: func(obj interface{}) { + sendServiceWork(KubeWorkRemoveService, queue, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if reflect.DeepEqual(newObj, oldObj) == false { + sendServiceWork(KubeWorkUpdateService, queue, newObj) + } + }, + }, + ) + stop := make(chan struct{}) + go svcController.Run(stop) +} + +// Returns a cache.ListWatch that gets all changes to Pods. +func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything()) +} + +// Dispatch the notifications for Pods by type to the worker +func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) { + if pod, ok := podObj.(*kapi.Pod); ok { + log.Println("Pod Action: ", action, " for Pod:", pod.Name) + queue <- KubeWork{ + Action: action, + Pod: pod, + } + } +} + +// Launch the go routine to watch notifications for Pods. +func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) { + var podController *kframework.Controller + _, podController = kframework.NewInformer( + createPodLW(kubeClient), + &kapi.Pod{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sendPodWork(KubeWorkAddPod, queue, obj) + }, + DeleteFunc: func(obj interface{}) { + if o, ok := obj.(*kapi.Pod); ok { + if _, ok := deleteMap[o.Name]; ok { + delete(deleteMap, o.Name) + } + } + sendPodWork(KubeWorkRemovePod, queue, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod) + if reflect.DeepEqual(oldObj, newObj) == false { + //Adding Pod + if _, ok := addMap[n.Name]; ok { + if kapi.IsPodReady(n) { + delete(addMap, n.Name) + sendPodWork(KubeWorkUpdatePod, queue, newObj) + } + return + } + //Deleting Pod + if _, ok := deleteMap[n.Name]; ok { + return + } else { + if o.ObjectMeta.DeletionTimestamp == nil && + n.ObjectMeta.DeletionTimestamp != nil { + deleteMap[n.Name] = n + return + } + //Updating Pod + sendPodWork(KubeWorkUpdatePod, queue, newObj) + } + } + }, + }, + ) + stop := make(chan struct{}) + go podController.Run(stop) +} + +func runBookKeeper(workQue <-chan KubeWork, consulQueue chan<- ConsulWork, apiClient *kclient.Client) { + + client := newClientBookKeeper(apiClient) + client.consulQueue = consulQueue + + for work := range workQue { + switch work.Action { + case KubeWorkAddNode: + client.AddNode(work.Node) + case KubeWorkRemoveNode: + client.RemoveNode(work.Node.Name) + case KubeWorkUpdateNode: + client.UpdateNode(work.Node) + case KubeWorkAddService: + client.AddService(work.Service) + case KubeWorkRemoveService: + client.RemoveService(work.Service) + case KubeWorkUpdateService: + client.UpdateService(work.Service) + case KubeWorkAddPod: + client.AddPod(work.Pod) + case KubeWorkRemovePod: + client.RemovePod(work.Pod) + case KubeWorkUpdatePod: + client.UpdatePod(work.Pod) + case KubeWorkSync: + client.Sync() + default: + log.Println("Unsupported work action: ", work.Action) + } + } + log.Println("Completed all work") +} + +func runConsulWorker(queue <-chan ConsulWork, client *consulapi.Client) { + worker := newConsulAgentWorker(client) + + for work := range queue { + log.Println("Consul Work Action: ", work.Action, " BaseID:", work.Config.BaseID) + + switch work.Action { + case ConsulWorkAddService: + worker.AddService(work.Config, work.Service) + case ConsulWorkRemoveService: + worker.RemoveService(work.Config) + case ConsulWorkAddPod: + worker.AddPod(work.Config, work.Pod) + case ConsulWorkRemovePod: + worker.RemovePod(work.Config) + case ConsulWorkSyncDNS: + worker.SyncDNS() + default: + log.Println("Unsupported Action of: ", work.Action) + } + + } +} + +func main() { + flag.Parse() + // TODO: Validate input flags. + var err error + var consulClient *consulapi.Client + + if consulClient, err = newConsulClient(*argConsulAgent); err != nil { + log.Fatalf("Failed to create Consul client - %v", err) + } + + kubeClient, err := newKubeClient() + if err != nil { + log.Fatalf("Failed to create a kubernetes client: %v", err) + } + + if _, err := kubeClient.ServerVersion(); err != nil { + log.Fatal("Could not connect to Kube Master", err) + } else { + log.Println("Connected to K8S API Server") + } + + kubeWorkQueue := make(chan KubeWork) + consulWorkQueue := make(chan ConsulWork) + go runBookKeeper(kubeWorkQueue, consulWorkQueue, kubeClient) + watchForNodes(kubeClient, kubeWorkQueue) + watchForServices(kubeClient, kubeWorkQueue) + watchForPods(kubeClient, kubeWorkQueue) + go runConsulWorker(consulWorkQueue, consulClient) + + log.Println("Running Consul Sync loop every: 60 seconds") + csyncer := time.NewTicker(time.Second * time.Duration(60)) + go func() { + for t := range csyncer.C { + log.Println("Consul Sync request at:", t) + consulWorkQueue <- ConsulWork{ + Action: ConsulWorkSyncDNS, + } + } + }() + + log.Println("Running Kube Sync loop every: 60 seconds") + ksyncer := time.NewTicker(time.Second * time.Duration(60)) + go func() { + for t := range ksyncer.C { + log.Println("Kube Sync request at:", t) + kubeWorkQueue <- KubeWork{ + Action: KubeWorkSync, + } + } + }() + + // Prevent exit + select {} +} diff --git a/kube2consul/src/kube2consul/kube_service.go b/kube2consul/src/kube2consul/kube_service.go new file mode 100644 index 0000000..887f6c8 --- /dev/null +++ b/kube2consul/src/kube2consul/kube_service.go @@ -0,0 +1,100 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// kube_service.go +package main + +import ( + "log" + "strconv" + "strings" + + consulapi "github.com/hashicorp/consul/api" + kapi "k8s.io/kubernetes/pkg/api" +) + +type KubeServiceAction interface { + BuildServiceInfoMap(*kapi.ServicePort, *map[string]string) + BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceRegistration + BuildAgentServiceCheck(DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceCheck +} + +type KubeService struct { + KubeServiceAction + sInfos map[string]*ServiceInfo +} + +func newKubeService() *KubeService { + return &KubeService{ + sInfos: make(map[string]*ServiceInfo), + } +} + +func (kube *KubeService) BuildServiceInfoMap(sport *kapi.ServicePort, annos *map[string]string) { + portstr := strconv.Itoa(int(sport.Port)) + nameprefix := "SERVICE_" + portstr + "_NAME" + tagprefix := "SERVICE_" + portstr + "_TAGS" + + for name, value := range *annos { + if strings.HasPrefix(name, nameprefix) { + addToServiceInfoMap(name, value, Service_NAME, &kube.sInfos) + } else if strings.HasPrefix(name, tagprefix) { + addToServiceInfoMap(name, value, Service_TAGS, &kube.sInfos) + } + } +} + +func (kube *KubeService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceRegistration { + name := sInfo.Name + var tagslice []string + tagslice = strings.Split(sInfo.Tags, ",") + + for _, elem := range tagslice { + var elemslice []string + elemslice = strings.Split(elem, ":") + if elemslice[0] == PROTOCOL { + switch elemslice[1] { + case PROTOCOL_UI: + name = PREFIX_UI + name + default: + log.Println("regular protocol:", elemslice[1]) + } + break + } + } + + asrID := config.BaseID + "-" + strconv.Itoa(int(sport.Port)) + "-" + name + + regPort := sport.NodePort + if config.ServiceType == kapi.ServiceTypeClusterIP { + regPort = sport.Port + } + + return &consulapi.AgentServiceRegistration{ + ID: asrID, + Name: name, + Address: config.IPAddress, + Port: int(regPort), + Tags: tagslice, + } +} + +func (kube *KubeService) BuildAgentServiceCheck(config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceCheck { + log.Println("Creating service check for: ", config.IPAddress, " on Port: ", sport.NodePort) + return &consulapi.AgentServiceCheck{ + TCP: config.IPAddress + ":" + strconv.Itoa(int(sport.NodePort)), + Interval: "60s", + } +} diff --git a/kube2consul/src/kube2consul/kube_work.go b/kube2consul/src/kube2consul/kube_work.go new file mode 100644 index 0000000..4695a76 --- /dev/null +++ b/kube2consul/src/kube2consul/kube_work.go @@ -0,0 +1,511 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// kube_work.go +package main + +import ( + "encoding/json" + "kube2consul/util/restclient" + "log" + "strings" + "sync" + "time" + + kapi "k8s.io/kubernetes/pkg/api" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + kselector "k8s.io/kubernetes/pkg/fields" + klabels "k8s.io/kubernetes/pkg/labels" +) + +type KubeBookKeeper interface { + AddNode(*kapi.Node) + RemoveNode(*kapi.Node) + UpdateNode(*kapi.Node) + AddService(*kapi.Service) + RemoveService(*kapi.Service) + UpdateService(*kapi.Service) + AddPod(*kapi.Pod) + RemovePod(*kapi.Pod) + UpdatePod(*kapi.Pod) +} + +type ClientBookKeeper struct { + sync.Mutex + KubeBookKeeper + client *kclient.Client + nodes map[string]*KubeNode + services map[string]*kapi.Service + pods map[string]*kapi.Pod + consulQueue chan<- ConsulWork +} + +type KubeNode struct { + name string + readyStatus bool + serviceIDS map[string]string + address string +} + +func buildServiceBaseID(nodeName string, service *kapi.Service) string { + return nodeName + "-" + service.Name +} + +func nodeReady(node *kapi.Node) bool { + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == kapi.NodeReady { + return node.Status.Conditions[i].Status == kapi.ConditionTrue + } + } + + log.Println("NodeReady condition is missing from node: ", node.Name) + return false +} + +func newKubeNode() *KubeNode { + return &KubeNode{ + name: "", + readyStatus: false, + serviceIDS: make(map[string]string), + } +} + +func newClientBookKeeper(client *kclient.Client) *ClientBookKeeper { + return &ClientBookKeeper{ + client: client, + nodes: make(map[string]*KubeNode), + services: make(map[string]*kapi.Service), + pods: make(map[string]*kapi.Pod), + } +} + +func (client *ClientBookKeeper) AddNode(node *kapi.Node) { + client.Lock() + defer client.Unlock() + if _, ok := client.nodes[node.Name]; ok { + log.Printf("Node:%s already exist. skip this ADD notification.", node.Name) + return + } + //Add to Node Collection + kubeNode := newKubeNode() + kubeNode.readyStatus = nodeReady(node) + kubeNode.name = node.Name + kubeNode.address = node.Status.Addresses[0].Address + + //Send request for Service Addition for node and all serviceIDS (Create Service ID here) + if kubeNode.readyStatus { + client.AddAllServicesToNode(kubeNode) + } + + client.nodes[node.Name] = kubeNode + log.Println("Added Node: ", node.Name) +} + +func (client *ClientBookKeeper) AddAllServicesToNode(node *KubeNode) { + for _, service := range client.services { + if service.Spec.Type == kapi.ServiceTypeClusterIP { + log.Println("ClusterIP service ignored for node binding:", service.Name) + } else { + client.AttachServiceToNode(node, service) + } + } +} + +func (client *ClientBookKeeper) AttachServiceToNode(node *KubeNode, service *kapi.Service) { + baseID := buildServiceBaseID(node.name, service) + client.consulQueue <- ConsulWork{ + Action: ConsulWorkAddService, + Service: service, + Config: DnsInfo{ + BaseID: baseID, + IPAddress: node.address, + ServiceType: service.Spec.Type, + }, + } + log.Println("Requesting Addition of services with Base ID: ", baseID) + node.serviceIDS[service.Name] = baseID +} + +func (client *ClientBookKeeper) RemoveNode(name string) { + client.Lock() + defer client.Unlock() + if kubeNode, ok := client.nodes[name]; ok { + //Remove All DNS for node + client.RemoveAllServicesFromNode(kubeNode) + //Remove Node from Collection + delete(client.nodes, name) + } else { + log.Println("Attmepted to remove missing node: ", name) + } + + log.Println("Queued Node to be removed: ", name) +} + +func (client *ClientBookKeeper) RemoveAllServicesFromNode(node *KubeNode) { + for _, service := range client.services { + if service.Spec.Type == kapi.ServiceTypeClusterIP { + log.Println("ClusterIP service ignored for node unbinding:", service.Name) + } else { + client.DetachServiceFromNode(node, service) + } + } +} + +func (client *ClientBookKeeper) DetachServiceFromNode(node *KubeNode, service *kapi.Service) { + if baseID, ok := node.serviceIDS[service.Name]; ok { + client.consulQueue <- ConsulWork{ + Action: ConsulWorkRemoveService, + Config: DnsInfo{ + BaseID: baseID, + }, + } + + log.Println("Requesting Removal of services with Base ID: ", baseID) + delete(node.serviceIDS, service.Name) + } +} + +func (client *ClientBookKeeper) UpdateNode(node *kapi.Node) { + if nodeReady(node) { + // notReady nodes also stored in client.nodes + client.AddAllServicesToNode(client.nodes[node.Name]) + } else { + client.RemoveAllServicesFromNode(client.nodes[node.Name]) + } +} + +func (client *ClientBookKeeper) AddService(svc *kapi.Service) { + client.Lock() + defer client.Unlock() + if !isUserDefinedService(svc) { + log.Println("Not the target, skip this ADD notification for service:", svc.Name) + return + } + + if _, ok := client.services[svc.Name]; ok { + log.Printf("service:%s already exist. skip this ADD notification.", svc.Name) + return + } + + if kapi.IsServiceIPSet(svc) { + if svc.Spec.Type == kapi.ServiceTypeClusterIP { + log.Println("Adding ClusterIP service:", svc.Name) + client.consulQueue <- ConsulWork{ + Action: ConsulWorkAddService, + Service: svc, + Config: DnsInfo{ + BaseID: svc.Name, + IPAddress: svc.Spec.ClusterIP, + ServiceType: svc.Spec.Type, + }, + } + } else { + //Check and use local node list first + if len(client.nodes) > 0 { + for _, kubeNode := range client.nodes { + if kubeNode.readyStatus { + client.AttachServiceToNode(kubeNode, svc) + } + } + } else { + log.Println("local node list is empty, retrieve it from the api server.") + nodes := client.client.Nodes() + if nodeList, err := nodes.List(kapi.ListOptions{ + LabelSelector: klabels.Everything(), + FieldSelector: kselector.Everything(), + }); err == nil { + for _, node := range nodeList.Items { + if nodeReady(&node) { + kubeNode := newKubeNode() + kubeNode.readyStatus = nodeReady(&node) + kubeNode.name = node.Name + kubeNode.address = node.Status.Addresses[0].Address + client.AttachServiceToNode(kubeNode, svc) + } + } + } + } + } + client.services[svc.Name] = svc + log.Println("Queued Service to be added: ", svc.Name) + } else { + // if ClusterIP is not set, do not create a DNS records + log.Printf("Skipping dns record for headless service: %s\n", svc.Name) + } +} + +func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) { + client.Lock() + defer client.Unlock() + if !isUserDefinedService(svc) { + log.Println("Not the target, skip this Remove notification for service:", svc.Name) + return + } + + if _, ok := client.services[svc.Name]; !ok { + log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name) + return + } + + if svc.Spec.Type == kapi.ServiceTypeClusterIP { + log.Println("Removing ClusterIP service:", svc.Name) + //Perform All DNS Removes + client.consulQueue <- ConsulWork{ + Action: ConsulWorkRemoveService, + Config: DnsInfo{ + BaseID: svc.Name, + }, + } + } else { + //Check and use local node list first + if len(client.nodes) > 0 { + for _, kubeNode := range client.nodes { + if kubeNode.readyStatus { + client.DetachServiceFromNode(kubeNode, svc) + } + } + } else { + log.Println("local node list is empty, retrieve it from the api server. sync it later.") + } + } + delete(client.services, svc.Name) + log.Println("Queued Service to be removed: ", svc.Name) +} + +func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) { + if !isUserDefinedService(svc) { + log.Println("Not the target, skip this Update notification for service:", svc.Name) + return + } + + client.RemoveService(svc) + client.AddService(svc) +} + +func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) { + client.Lock() + defer client.Unlock() + if !isUserDefinedPod(pod) { + log.Println("Not the target, skip this ADD notification for pod:", pod.Name) + return + } + + if _, ok := client.pods[pod.Name]; ok { + log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name) + return + } + + //newly added Pod + if pod.Name == "" || pod.Status.PodIP == "" { + log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name) + addMap[pod.Name] = pod + return + } + //existing Pod + if kapi.IsPodReady(pod) { + if *argPdmControllerUrl != "" { + fillPdmPodIPsMap(pod) + } + } + //Perform All DNS Adds + client.consulQueue <- ConsulWork{ + Action: ConsulWorkAddPod, + Pod: pod, + Config: DnsInfo{ + BaseID: pod.Name, + IPAddress: pod.Status.PodIP, + }, + } + client.pods[pod.Name] = pod + log.Println("Queued Pod to be added: ", pod.Name) +} + +func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) { + client.Lock() + defer client.Unlock() + if !isUserDefinedPod(pod) { + log.Println("Not the target, skip this Remove notification for pod:", pod.Name) + return + } + + if _, ok := client.pods[pod.Name]; !ok { + log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name) + return + } + //Perform All DNS Removes + client.consulQueue <- ConsulWork{ + Action: ConsulWorkRemovePod, + Config: DnsInfo{ + BaseID: pod.Name, + }, + } + delete(client.pods, pod.Name) + log.Println("Queued Pod to be removed: ", pod.Name) +} + +func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) { + if !isUserDefinedPod(pod) { + log.Println("Not the target, skip this Update notification for pod:", pod.Name) + return + } + + if *argPdmControllerUrl != "" { + fillPdmPodIPsMap(pod) + } + client.RemovePod(pod) + client.AddPod(pod) +} + +func (client *ClientBookKeeper) Sync() { + nodes := client.client.Nodes() + if nodeList, err := nodes.List(kapi.ListOptions{ + LabelSelector: klabels.Everything(), + FieldSelector: kselector.Everything(), + }); err == nil { + for name, _ := range client.nodes { + if !containsNodeName(name, nodeList) { + log.Printf("Bookkeeper has node: %s that does not exist in api server", name) + client.RemoveNode(name) + } + } + } + + svcs := client.client.Services(kapi.NamespaceAll) + if svcList, err := svcs.List(kapi.ListOptions{ + LabelSelector: klabels.Everything(), + FieldSelector: kselector.Everything(), + }); err == nil { + for name, svc := range client.services { + if !containsSvcName(name, svcList) { + log.Printf("Bookkeeper has service: %s that does not exist in api server", name) + client.RemoveService(svc) + } + } + } + + pods := client.client.Pods(kapi.NamespaceAll) + if podList, err := pods.List(kapi.ListOptions{ + LabelSelector: klabels.Everything(), + FieldSelector: kselector.Everything(), + }); err == nil { + for name, pod := range client.pods { + if !containsPodName(name, podList) { + log.Printf("Bookkeeper has pod: %s that does not exist in api server", name) + if _, ok := deleteMap[pod.Name]; ok { + log.Println("Missing remove notification for Pod: ", pod.Name) + delete(deleteMap, pod.Name) + } + client.RemovePod(pod) + } + } + //It was observed that the kube notification may lost after the restarting of kube master. + //Currently this is only done for Pod. same for Node and Service will be done when required. + for _, pod := range podList.Items { + if _, ok := client.pods[pod.ObjectMeta.Name]; !ok { + if kapi.IsPodReady(&pod) { + log.Println("Missing add/update notification for Pod: ", pod.ObjectMeta.Name) + client.AddPod(&pod) + } + } + } + } +} + +func containsPodName(name string, pods *kapi.PodList) bool { + for _, pod := range pods.Items { + if pod.ObjectMeta.Name == name { + return true + } + } + return false +} + +func containsSvcName(name string, svcs *kapi.ServiceList) bool { + for _, svc := range svcs.Items { + if svc.ObjectMeta.Name == name { + return true + } + } + return false +} + +func containsNodeName(name string, nodes *kapi.NodeList) bool { + for _, node := range nodes.Items { + if node.ObjectMeta.Name == name { + return true + } + } + return false +} + +func isUserDefinedPod(pod *kapi.Pod) bool { + for _, c := range pod.Spec.Containers { + for _, e := range c.Env { + if strings.HasPrefix(e.Name, "SERVICE_") { + if strings.Contains(e.Name, "_NAME") { + return true + } + } + } + } + return false +} + +func isUserDefinedService(svc *kapi.Service) bool { + for name, _ := range svc.ObjectMeta.Annotations { + if strings.HasPrefix(name, "SERVICE_") { + if strings.Contains(name, "_NAME") { + return true + } + } + } + return false +} + +func fillPdmPodIPsMap(pod *kapi.Pod) { + base := *argPdmControllerUrl + resUrl := podUrl + "/" + pod.Namespace + "/pods" + rclient := restclient.NewRESTClient(base, resUrl, pod.Name) + //try 10 times at maximum + for i := 0; i < 10; i++ { + log.Printf("try REST get the PDM PodIP for pod:%s at %d time..", pod.Name, i) + buf, err := rclient.Get() + if err != nil { + log.Printf("request PDM PodIP info for Pod:%s with error:%v", pod.Name, err) + time.Sleep(6 * 1e9) //sleep for 6 secs + continue + } + podIPInfo := PdmPodIPInfo{} + json.Unmarshal(buf, &podIPInfo) + IPInfo := podIPInfo.Pod + IPs := IPInfo.IPs + if len(IPs) == 0 { + log.Printf("request PDM PodIP info for Pod:%s return empty ip list.", pod.Name) + return + } else { + for _, ip := range IPs { + if ip.IPAddress == "" { + log.Printf("request PDM PodIP info for Pod:%s return empty ip.", pod.Name) + return + } + } + pdmPodIPsMap[pod.Name] = IPs + } + log.Println("successfully REST get the PDM PodIP for pod:", pod.Name) + return + } + + log.Println("failed to REST get the PDM PodIP for pod:", pod.Name) +} diff --git a/kube2consul/src/kube2consul/pod_service.go b/kube2consul/src/kube2consul/pod_service.go new file mode 100644 index 0000000..c6294cb --- /dev/null +++ b/kube2consul/src/kube2consul/pod_service.go @@ -0,0 +1,677 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// pod_service.go +package main + +import ( + "fmt" + "log" + "strconv" + "strings" + + consulapi "github.com/hashicorp/consul/api" + kapi "k8s.io/kubernetes/pkg/api" +) + +type PodServiceAction interface { + BuildServiceInfoMap(*kapi.ContainerPort, *kapi.Container) + BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ContainerPort, string) *consulapi.AgentServiceRegistration +} + +type PodService struct { + PodServiceAction + sInfos map[string]*ServiceInfo +} + +func newPodService() *PodService { + return &PodService{ + sInfos: make(map[string]*ServiceInfo), + } +} + +func (pod *PodService) BuildServiceInfoMap(cport *kapi.ContainerPort, container *kapi.Container) { + portstr := strconv.Itoa(int(cport.ContainerPort)) + fullname := "SERVICE_" + portstr + "_NAME" + nameprefix := "SERVICE_" + portstr + "_NAME_" + tagprefix := "SERVICE_" + portstr + "_TAGS" + labelprefix := "SERVICE_" + portstr + "_ROUTE_LABELS" + mdataprefix := "SERVICE_" + portstr + "_META_DATA" + nsprefix := "SERVICE_" + portstr + "_NAMESPACE" + ttlprefix := "SERVICE_" + portstr + "_CHECK_TTL" + httpprefix := "SERVICE_" + portstr + "_CHECK_HTTP" + tcpprefix := "SERVICE_" + portstr + "_CHECK_TCP" + cmdprefix := "SERVICE_" + portstr + "_CHECK_CMD" + scriptprefix := "SERVICE_" + portstr + "_CHECK_SCRIPT" + timeoutprefix := "SERVICE_" + portstr + "_CHECK_TIMEOUT" + intervalprefix := "SERVICE_" + portstr + "_CHECK_INTERVAL" + for _, env := range container.Env { + if env.Name == fullname || strings.HasPrefix(env.Name, nameprefix) { + addToServiceInfoMap(env.Name, env.Value, Service_NAME, &pod.sInfos) + } else if strings.HasPrefix(env.Name, tagprefix) { + addToServiceInfoMap(env.Name, env.Value, Service_TAGS, &pod.sInfos) + } else if strings.HasPrefix(env.Name, labelprefix) { + addToServiceInfoMap(env.Name, env.Value, Service_LABELS, &pod.sInfos) + } else if strings.HasPrefix(env.Name, mdataprefix) { + addToServiceInfoMap(env.Name, env.Value, Service_MDATA, &pod.sInfos) + } else if strings.HasPrefix(env.Name, nsprefix) { + addToServiceInfoMap(env.Name, env.Value, Service_NS, &pod.sInfos) + } else if strings.HasPrefix(env.Name, ttlprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_TTL, &pod.sInfos) + } else if strings.HasPrefix(env.Name, httpprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_HTTP, &pod.sInfos) + } else if strings.HasPrefix(env.Name, tcpprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_TCP, &pod.sInfos) + } else if strings.HasPrefix(env.Name, cmdprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_CMD, &pod.sInfos) + } else if strings.HasPrefix(env.Name, scriptprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_SCRIPT, &pod.sInfos) + } else if strings.HasPrefix(env.Name, timeoutprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_TIMEOUT, &pod.sInfos) + } else if strings.HasPrefix(env.Name, intervalprefix) && env.Value != "" { + addToServiceInfoMap(env.Name, env.Value, Service_INTERVAL, &pod.sInfos) + } + } +} + +func (pod *PodService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, cport *kapi.ContainerPort, cId string) []*consulapi.AgentServiceRegistration { + asrs := []*consulapi.AgentServiceRegistration{} + checks := []*consulapi.AgentServiceCheck{} + + //build Checks object and populate checks table + var checks_in_tag string + + if len(sInfo.TTL) > 0 { + check := new(consulapi.AgentServiceCheck) + check.TTL = sInfo.TTL + kv := "ttl:" + sInfo.TTL + "," + checks_in_tag += kv + checks = append(checks, check) + } + + if len(sInfo.HTTP) > 0 { + check := new(consulapi.AgentServiceCheck) + check.HTTP = fmt.Sprintf("http://%s:%d%s", config.IPAddress, cport.ContainerPort, sInfo.HTTP) + kv := "http:" + sInfo.HTTP + "," + checks_in_tag += kv + if len(sInfo.Interval) == 0 { + check.Interval = DefaultInterval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + DefaultInterval + "," + checks_in_tag += kv + } + } else { + check.Interval = sInfo.Interval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + sInfo.Interval + "," + checks_in_tag += kv + } + } + if len(sInfo.Timeout) > 0 { + check.Timeout = sInfo.Timeout + if strings.Contains(checks_in_tag, "timeout:") { + //do nothing + } else { + kv = "timeout:" + sInfo.Timeout + "," + checks_in_tag += kv + } + } + checks = append(checks, check) + } + + if len(sInfo.TCP) > 0 { + check := new(consulapi.AgentServiceCheck) + check.TCP = fmt.Sprintf("%s:%d", config.IPAddress, cport.ContainerPort) + kv := "tcp:ok," + checks_in_tag += kv + if len(sInfo.Interval) == 0 { + check.Interval = DefaultInterval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + DefaultInterval + "," + checks_in_tag += kv + } + } else { + check.Interval = sInfo.Interval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + sInfo.Interval + "," + checks_in_tag += kv + } + } + if len(sInfo.Timeout) > 0 { + check.Timeout = sInfo.Timeout + if strings.Contains(checks_in_tag, "timeout:") { + //do nothing + } else { + kv = "timeout:" + sInfo.Timeout + "," + checks_in_tag += kv + } + } + checks = append(checks, check) + } + + if len(sInfo.CMD) > 0 { + check := new(consulapi.AgentServiceCheck) + check.Script = fmt.Sprintf("check-cmd %s %s %s", cId, strconv.Itoa(int(cport.ContainerPort)), sInfo.CMD) + kv := "script:" + sInfo.CMD + "," + checks_in_tag += kv + if len(sInfo.Interval) == 0 { + check.Interval = DefaultInterval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + DefaultInterval + "," + checks_in_tag += kv + } + } else { + check.Interval = sInfo.Interval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + sInfo.Interval + "," + checks_in_tag += kv + } + } + checks = append(checks, check) + } + + if len(sInfo.Script) > 0 { + check := new(consulapi.AgentServiceCheck) + withIp := strings.Replace(sInfo.Script, "$SERVICE_IP", config.IPAddress, -1) + check.Script = strings.Replace(withIp, "$SERVICE_PORT", strconv.Itoa(int(cport.ContainerPort)), -1) + kv := "script:" + sInfo.Script + "," + checks_in_tag += kv + if len(sInfo.Interval) == 0 { + check.Interval = DefaultInterval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + DefaultInterval + "," + checks_in_tag += kv + } + } else { + check.Interval = sInfo.Interval + if strings.Contains(checks_in_tag, "interval:") { + //do nothing + } else { + kv = "interval:" + sInfo.Interval + "," + checks_in_tag += kv + } + } + if len(sInfo.Timeout) > 0 { + check.Timeout = sInfo.Timeout + if strings.Contains(checks_in_tag, "timeout:") { + //do nothing + } else { + kv = "timeout:" + sInfo.Timeout + "," + checks_in_tag += kv + } + } + checks = append(checks, check) + } + + //remove trailing "," + if len(checks_in_tag) != 0 { + checks_in_tag = strings.TrimRight(checks_in_tag, ",") + } + + //build other talbes in tags + var ( + base_in_tag string + lb_in_tag string + labels_in_tag = sInfo.Labels + metadata_in_tag = sInfo.MData + ns_in_tag = sInfo.NS + ) + + ts := strings.Split(sInfo.Tags, ",") + for _, elem := range ts { + if strings.Contains(elem, NETWORK_PLANE_TYPE) || strings.Contains(elem, VISUAL_RANGE) { + kv := "," + elem + labels_in_tag += kv + continue + } + + if strings.Contains(elem, LB_POLICY) || strings.Contains(elem, LB_SVR_PARAMS) { + kv := "," + elem + lb_in_tag += kv + continue + } + + kv := "," + elem + base_in_tag += kv + } + + //remove leading "," + if len(base_in_tag) != 0 { + base_in_tag = strings.TrimLeft(base_in_tag, ",") + } + + if len(lb_in_tag) != 0 { + lb_in_tag = strings.TrimLeft(lb_in_tag, ",") + } + + if len(labels_in_tag) != 0 { + labels_in_tag = strings.TrimLeft(labels_in_tag, ",") + } + + //build tables in tag + var tagslice []string + if len(base_in_tag) != 0 { + tagslice = append(tagslice, buildTableInJsonFormat(Table_BASE, base_in_tag)) + } + + if len(lb_in_tag) != 0 { + tagslice = append(tagslice, buildTableInJsonFormat(Table_LB, lb_in_tag)) + } + + if len(labels_in_tag) == 0 { + tagslice = append(tagslice, DefaultLabels) + } else { + if !strings.Contains(labels_in_tag, "visualRange") { + labels_in_tag += ",visualRange:1" + } + tagslice = append(tagslice, buildTableInJsonFormat(Table_LABELS, labels_in_tag)) + } + + if len(metadata_in_tag) != 0 { + tagslice = append(tagslice, buildTableInJsonFormat(Table_META_DATA, metadata_in_tag)) + } + + if len(ns_in_tag) != 0 { + tagslice = append(tagslice, "\"ns\":{\"namespace\":\"" + ns_in_tag + "\"}") + } + + if len(checks_in_tag) != 0 { + tagslice = append(tagslice, buildTableInJsonFormat(Table_CHECKS, checks_in_tag)) + } + + //append "-" + name := sInfo.Name + if len(ns_in_tag) != 0 { + name = name + "-" + ns_in_tag + } + + //handle IUI service +// for _, elem := range ts { +// var elemslice []string +// elemslice = strings.Split(elem, ":") +// if elemslice[0] == PROTOCOL { +// switch elemslice[1] { +// case PROTOCOL_UI: +// name = PREFIX_UI + name +// default: +// log.Println("regular protocol:", elemslice[1]) +// } +// break +// } +// } + + if len(sInfo.IPs) == 0 { + serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name + asr := &consulapi.AgentServiceRegistration{ + ID: serviceID, + Name: name, + Address: config.IPAddress, + Port: int(cport.ContainerPort), + Tags: tagslice, + Checks: checks, + } + asrs = append(asrs, asr) + return asrs + } else { + for _, ip := range sInfo.IPs { + serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name + "-" + ip.NetworkPlane + addr := ip.IPAddress + tagslice_with_single_network_plane_type := refillTagsliceWithSingleNetworkPlaneType(tagslice, labels_in_tag, ip.NetworkPlane) + asr := &consulapi.AgentServiceRegistration{ + ID: serviceID, + Name: name, + Address: addr, + Port: int(cport.ContainerPort), + Tags: tagslice_with_single_network_plane_type, + Checks: checks, + } + asrs = append(asrs, asr) + } + return asrs + } +} + +func addToServiceInfoMap(name, value, flag string, smap *map[string]*ServiceInfo) { + segs := strings.Split(name, "_") + switch flag { + case Service_NAME: + if strings.ContainsAny(value, ".") { + log.Printf("Warning:service name %s contains dot", value) + value = strings.Replace(value, ".", "-", -1) + log.Printf("Warning:service name has been modified to %s", value) + } + if len(segs) == 3 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Name = value + } else { + sInfo := &ServiceInfo{ + Name: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 4 { + id := segs[3] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Name = value + } else { + sInfo := &ServiceInfo{ + Name: value, + } + (*smap)[id] = sInfo + } + } + case Service_TAGS: + if len(segs) == 3 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Tags = value + } else { + sInfo := &ServiceInfo{ + Tags: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 4 { + id := segs[3] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Tags = value + } else { + sInfo := &ServiceInfo{ + Tags: value, + } + (*smap)[id] = sInfo + } + } + case Service_LABELS: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Labels = value + } else { + sInfo := &ServiceInfo{ + Labels: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Labels = value + } else { + sInfo := &ServiceInfo{ + Labels: value, + } + (*smap)[id] = sInfo + } + } + case Service_MDATA: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].MData = value + } else { + sInfo := &ServiceInfo{ + MData: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].MData = value + } else { + sInfo := &ServiceInfo{ + MData: value, + } + (*smap)[id] = sInfo + } + } + case Service_NS: + if len(segs) == 3 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].NS = value + } else { + sInfo := &ServiceInfo{ + NS: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 4 { + id := segs[3] + if _, ok := (*smap)[id]; ok { + (*smap)[id].NS = value + } else { + sInfo := &ServiceInfo{ + NS: value, + } + (*smap)[id] = sInfo + } + } + case Service_TTL: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].TTL = value + } else { + sInfo := &ServiceInfo{ + TTL: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].TTL = value + } else { + sInfo := &ServiceInfo{ + TTL: value, + } + (*smap)[id] = sInfo + } + } + case Service_HTTP: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].HTTP = value + } else { + sInfo := &ServiceInfo{ + HTTP: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].HTTP = value + } else { + sInfo := &ServiceInfo{ + HTTP: value, + } + (*smap)[id] = sInfo + } + } + case Service_TCP: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].TCP = value + } else { + sInfo := &ServiceInfo{ + TCP: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].TCP = value + } else { + sInfo := &ServiceInfo{ + TCP: value, + } + (*smap)[id] = sInfo + } + } + case Service_CMD: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].CMD = value + } else { + sInfo := &ServiceInfo{ + CMD: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].CMD = value + } else { + sInfo := &ServiceInfo{ + CMD: value, + } + (*smap)[id] = sInfo + } + } + case Service_SCRIPT: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Script = value + } else { + sInfo := &ServiceInfo{ + Script: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Script = value + } else { + sInfo := &ServiceInfo{ + Script: value, + } + (*smap)[id] = sInfo + } + } + case Service_TIMEOUT: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Timeout = value + } else { + sInfo := &ServiceInfo{ + Timeout: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Timeout = value + } else { + sInfo := &ServiceInfo{ + Timeout: value, + } + (*smap)[id] = sInfo + } + } + case Service_INTERVAL: + if len(segs) == 4 { + if _, ok := (*smap)["0"]; ok { + (*smap)["0"].Interval = value + } else { + sInfo := &ServiceInfo{ + Interval: value, + } + (*smap)["0"] = sInfo + } + } else if len(segs) == 5 { + id := segs[4] + if _, ok := (*smap)[id]; ok { + (*smap)[id].Interval = value + } else { + sInfo := &ServiceInfo{ + Interval: value, + } + (*smap)[id] = sInfo + } + } + default: + log.Println("Unsupported Service Attribute: ", flag) + } +} + +func buildTableInJsonFormat(table, input string) string { + head := "\"" + table + "\":{" + tail := "}" + body := "" + + s := strings.Split(input, ",") + slen := len(s) + for _, v := range s { + slen-- + s1 := strings.Split(v, ":") + mstr := "\"" + strings.TrimSpace(s1[0]) + "\":" + if strings.Contains(body, mstr) { + if slen == 0 { + body = strings.TrimRight(body, ",") + } + continue + } + if len(s1) == 2 { + kv := "\"" + strings.TrimSpace(s1[0]) + "\":\"" + strings.TrimSpace(s1[1]) + "\"" + if slen != 0 { + kv += "," + } + body += kv + } + } + + return head + body + tail +} + +func refillTagsliceWithSingleNetworkPlaneType(old []string, labels, nw_type string) []string { + var ret_tagslice []string + var new_labels string + + for _, elem := range old { + if strings.Contains(elem, NETWORK_PLANE_TYPE) { + continue + } + ret_tagslice = append(ret_tagslice, elem) + } + + sl := strings.Split(labels, NETWORK_PLANE_TYPE) + sl1 := strings.SplitN(sl[1], ",", 2) + nw_type_field := NETWORK_PLANE_TYPE + ":" + nw_type + + if len(sl1) == 2 { + new_labels = sl[0] + nw_type_field + "," + sl1[1] + } else { + new_labels = sl[0] + nw_type_field + } + ret_tagslice = append(ret_tagslice, buildTableInJsonFormat(Table_LABELS, new_labels)) + + return ret_tagslice +} \ No newline at end of file diff --git a/kube2consul/src/kube2consul/types.go b/kube2consul/src/kube2consul/types.go new file mode 100644 index 0000000..0105d98 --- /dev/null +++ b/kube2consul/src/kube2consul/types.go @@ -0,0 +1,96 @@ +/* +Copyright 2017 ZTE, Inc. and others. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// types.go +package main + +import ( + kapi "k8s.io/kubernetes/pkg/api" +) + +type KubeWorkAction string + +const ( + KubeWorkAddNode KubeWorkAction = "AddNode" + KubeWorkRemoveNode KubeWorkAction = "RemoveNode" + KubeWorkUpdateNode KubeWorkAction = "UpdateNode" + KubeWorkAddService KubeWorkAction = "AddService" + KubeWorkRemoveService KubeWorkAction = "RemoveService" + KubeWorkUpdateService KubeWorkAction = "UpdateService" + KubeWorkAddPod KubeWorkAction = "AddPod" + KubeWorkRemovePod KubeWorkAction = "RemovePod" + KubeWorkUpdatePod KubeWorkAction = "UpdatePod" + KubeWorkSync KubeWorkAction = "Sync" +) + +type KubeWork struct { + Action KubeWorkAction + Node *kapi.Node + Service *kapi.Service + Pod *kapi.Pod +} + +type ConsulWorkAction string + +const ( + ConsulWorkAddService ConsulWorkAction = "AddService" + ConsulWorkRemoveService ConsulWorkAction = "RemoveService" + ConsulWorkAddPod ConsulWorkAction = "AddPod" + ConsulWorkRemovePod ConsulWorkAction = "RemovePod" + ConsulWorkSyncDNS ConsulWorkAction = "SyncDNS" +) + +type ConsulWork struct { + Action ConsulWorkAction + Service *kapi.Service + Pod *kapi.Pod + Config DnsInfo +} + +type DnsInfo struct { + BaseID string + IPAddress string + ServiceType kapi.ServiceType +} + +type ServiceInfo struct { + Name string + Tags string + Labels string + MData string + NS string + TTL string + HTTP string + TCP string + CMD string + Script string + Timeout string + Interval string + IPs []PodIP +} + +type PdmPodIPInfo struct { + Pod PodIPInfo `json:"pod"` +} + +type PodIPInfo struct { + Name string `json:"name"` + IPs []PodIP `json:"ips"` +} + +type PodIP struct { + NetworkPlane string `json:"network_plane_name"` + IPAddress string `json:"ip_address"` +} diff --git a/kube2consul/src/kube2consul/util/restclient/restclient.go b/kube2consul/src/kube2consul/util/restclient/restclient.go new file mode 100644 index 0000000..e01ae0f --- /dev/null +++ b/kube2consul/src/kube2consul/util/restclient/restclient.go @@ -0,0 +1,42 @@ +package restclient + +import ( + "fmt" + "io/ioutil" + "net/http" +) + +type RESTClient struct { + base string + resource string + param string +} + +func NewRESTClient(baseURL string, versionedAPIPath string, urlParameter string) *RESTClient { + return &RESTClient{ + base: baseURL, + resource: versionedAPIPath, + param: urlParameter, + } +} + +func (c *RESTClient) Get() (b []byte, err error) { + url := c.base + "/" + c.resource + "/" + c.param + res, err := http.Get(url) + if err != nil { + return nil, err + } + + if res.StatusCode != 200 { + return nil, fmt.Errorf(res.Status) + } + + buf, err := ioutil.ReadAll(res.Body) + res.Body.Close() + + if err != nil { + return nil, err + } + + return buf, nil +} diff --git a/kube2consul/src/main/blueprint/deploy.yml b/kube2consul/src/main/blueprint/deploy.yml new file mode 100644 index 0000000..cc61076 --- /dev/null +++ b/kube2consul/src/main/blueprint/deploy.yml @@ -0,0 +1,8 @@ +--- +- remote_user: ubuntu + become: yes + become_method: sudo + vars_files: + - vars.yml + tasks: + - include: task.yml diff --git a/kube2consul/src/main/blueprint/list_of_servicelet.list b/kube2consul/src/main/blueprint/list_of_servicelet.list new file mode 100644 index 0000000..77a1d4f --- /dev/null +++ b/kube2consul/src/main/blueprint/list_of_servicelet.list @@ -0,0 +1,4 @@ +{ + "servicelet_module":[ + ] +} \ No newline at end of file diff --git a/kube2consul/src/main/blueprint/task.yml b/kube2consul/src/main/blueprint/task.yml new file mode 100644 index 0000000..45cfcd9 --- /dev/null +++ b/kube2consul/src/main/blueprint/task.yml @@ -0,0 +1,118 @@ +--- +- name: remove kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + state: absent + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + net: host + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + when: + - all_in_one == 'no' + - master_in_controller == 'no' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + net: host + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + CLUSTER_TYPE: "openshift" + when: + - all_in_one == 'no' + - master_in_controller == 'no' + - cluster_type == 'openshift' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + when: + - all_in_one == 'yes' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + CLUSTER_TYPE: "openshift" + when: + - all_in_one == 'yes' + - cluster_type == 'openshift' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + when: + - master_in_controller == 'yes' + - cluster_type == 'k8s' + +- name: run kube2consul container + docker: + name: kube2consul + image: "{{kube2consul_image}}" + log_driver: syslog + restart_policy: always + privileged: true + volumes: + - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}" + - "/root/.kube/config:/root/.kube/config:ro" + env: + KUBE_MASTER_IP: "{{kube_master_ip}}" + PDM_CONTROLLER_IP: "{{pdm_controller_ip}}" + JOIN_IP: "{{consul_join_ip}}" + ALL_IN_ONE: "yes" + CLUSTER_TYPE: "openshift" + when: + - master_in_controller == 'yes' + - cluster_type == 'openshift' \ No newline at end of file diff --git a/kube2consul/src/main/blueprint/vars.yml b/kube2consul/src/main/blueprint/vars.yml new file mode 100644 index 0000000..971438d --- /dev/null +++ b/kube2consul/src/main/blueprint/vars.yml @@ -0,0 +1,14 @@ +--- +- api_network_ip: +- man_network_ip: +- registry_url: +- cp_vertype: +- cp_type: +- cp_name: +- cp_version: +- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}" +- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}" +- pdm_controller_ip: "{{vp_ip}}" +- consul_join_ip: "{{zenap_msb_consul_server_ip}}" +- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}" +- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file diff --git a/kube2consul/src/main/docker/Dockerfile b/kube2consul/src/main/docker/Dockerfile new file mode 100644 index 0000000..278cac5 --- /dev/null +++ b/kube2consul/src/main/docker/Dockerfile @@ -0,0 +1,11 @@ +FROM alpine:3.3 +ENV CONSUL_VERSION 0.7.1 +ENV BASE / +ADD consul-linux_amd64.tar.gz / +RUN cd /usr/lib \ + && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \ + && ln -s /consul/libintl.so.8.1.3 libintl.so.8 +COPY kube2consul /bin/ +COPY start.sh / + +ENTRYPOINT exec /start.sh \ No newline at end of file diff --git a/kube2consul/src/main/docker/start.sh b/kube2consul/src/main/docker/start.sh new file mode 100644 index 0000000..033c50b --- /dev/null +++ b/kube2consul/src/main/docker/start.sh @@ -0,0 +1,36 @@ +#!/bin/sh +if [ -z "${KUBE_MASTER_IP}" ]; then + echo "kube master node ip is required." + exit 1 +fi + +if [ -n "${JOIN_IP}" ]; then + echo "### Starting consul client" + if [ -z "${ALL_IN_ONE}" ]; then + /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & + else + /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s & + fi +fi + +if [ -z "${RUN_MODE}" ]; then + echo "non-HA scenario." +else + echo "\n\n### Starting consul agent" + cd ./consul + ./entry.sh & +fi + +kube_url="http://${KUBE_MASTER_IP}:8080" + +if [ "${CLUSTER_TYPE}" == "openshift" ]; then + kube_url="https://${KUBE_MASTER_IP}:8443" +fi + +echo "\n\n### Starting kube2consul" +if [ -z "${PDM_CONTROLLER_IP}" ]; then + /bin/kube2consul --kube_master_url ${kube_url} +else + echo "in Paas mode." + /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527 +fi \ No newline at end of file -- cgit 1.2.3-korg