summaryrefslogtreecommitdiffstats
path: root/kube2consul
diff options
context:
space:
mode:
Diffstat (limited to 'kube2consul')
-rw-r--r--kube2consul/bin/Dockerfile11
-rw-r--r--kube2consul/bin/blueprint/deploy.yml8
-rw-r--r--kube2consul/bin/blueprint/list_of_servicelet.list4
-rw-r--r--kube2consul/bin/blueprint/task.yml118
-rw-r--r--kube2consul/bin/blueprint/vars.yml14
-rw-r--r--kube2consul/bin/start.sh36
-rw-r--r--kube2consul/pom.xml109
-rw-r--r--kube2consul/src/kube2consul/Makefile11
-rw-r--r--kube2consul/src/kube2consul/consul_work.go366
-rw-r--r--kube2consul/src/kube2consul/kube2consul.go449
-rw-r--r--kube2consul/src/kube2consul/kube_service.go100
-rw-r--r--kube2consul/src/kube2consul/kube_work.go511
-rw-r--r--kube2consul/src/kube2consul/pod_service.go677
-rw-r--r--kube2consul/src/kube2consul/types.go96
-rw-r--r--kube2consul/src/kube2consul/util/restclient/restclient.go42
-rw-r--r--kube2consul/src/main/blueprint/deploy.yml8
-rw-r--r--kube2consul/src/main/blueprint/list_of_servicelet.list4
-rw-r--r--kube2consul/src/main/blueprint/task.yml118
-rw-r--r--kube2consul/src/main/blueprint/vars.yml14
-rw-r--r--kube2consul/src/main/docker/Dockerfile11
-rw-r--r--kube2consul/src/main/docker/start.sh36
21 files changed, 0 insertions, 2743 deletions
diff --git a/kube2consul/bin/Dockerfile b/kube2consul/bin/Dockerfile
deleted file mode 100644
index 278cac5..0000000
--- a/kube2consul/bin/Dockerfile
+++ /dev/null
@@ -1,11 +0,0 @@
-FROM alpine:3.3
-ENV CONSUL_VERSION 0.7.1
-ENV BASE /
-ADD consul-linux_amd64.tar.gz /
-RUN cd /usr/lib \
- && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \
- && ln -s /consul/libintl.so.8.1.3 libintl.so.8
-COPY kube2consul /bin/
-COPY start.sh /
-
-ENTRYPOINT exec /start.sh \ No newline at end of file
diff --git a/kube2consul/bin/blueprint/deploy.yml b/kube2consul/bin/blueprint/deploy.yml
deleted file mode 100644
index cc61076..0000000
--- a/kube2consul/bin/blueprint/deploy.yml
+++ /dev/null
@@ -1,8 +0,0 @@
----
-- remote_user: ubuntu
- become: yes
- become_method: sudo
- vars_files:
- - vars.yml
- tasks:
- - include: task.yml
diff --git a/kube2consul/bin/blueprint/list_of_servicelet.list b/kube2consul/bin/blueprint/list_of_servicelet.list
deleted file mode 100644
index 77a1d4f..0000000
--- a/kube2consul/bin/blueprint/list_of_servicelet.list
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "servicelet_module":[
- ]
-} \ No newline at end of file
diff --git a/kube2consul/bin/blueprint/task.yml b/kube2consul/bin/blueprint/task.yml
deleted file mode 100644
index 45cfcd9..0000000
--- a/kube2consul/bin/blueprint/task.yml
+++ /dev/null
@@ -1,118 +0,0 @@
----
-- name: remove kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- state: absent
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- net: host
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- when:
- - all_in_one == 'no'
- - master_in_controller == 'no'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- net: host
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- CLUSTER_TYPE: "openshift"
- when:
- - all_in_one == 'no'
- - master_in_controller == 'no'
- - cluster_type == 'openshift'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- when:
- - all_in_one == 'yes'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- CLUSTER_TYPE: "openshift"
- when:
- - all_in_one == 'yes'
- - cluster_type == 'openshift'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- when:
- - master_in_controller == 'yes'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- CLUSTER_TYPE: "openshift"
- when:
- - master_in_controller == 'yes'
- - cluster_type == 'openshift' \ No newline at end of file
diff --git a/kube2consul/bin/blueprint/vars.yml b/kube2consul/bin/blueprint/vars.yml
deleted file mode 100644
index 971438d..0000000
--- a/kube2consul/bin/blueprint/vars.yml
+++ /dev/null
@@ -1,14 +0,0 @@
----
-- api_network_ip:
-- man_network_ip:
-- registry_url:
-- cp_vertype:
-- cp_type:
-- cp_name:
-- cp_version:
-- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}"
-- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}"
-- pdm_controller_ip: "{{vp_ip}}"
-- consul_join_ip: "{{zenap_msb_consul_server_ip}}"
-- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}"
-- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file
diff --git a/kube2consul/bin/start.sh b/kube2consul/bin/start.sh
deleted file mode 100644
index 033c50b..0000000
--- a/kube2consul/bin/start.sh
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/bin/sh
-if [ -z "${KUBE_MASTER_IP}" ]; then
- echo "kube master node ip is required."
- exit 1
-fi
-
-if [ -n "${JOIN_IP}" ]; then
- echo "### Starting consul client"
- if [ -z "${ALL_IN_ONE}" ]; then
- /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
- else
- /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
- fi
-fi
-
-if [ -z "${RUN_MODE}" ]; then
- echo "non-HA scenario."
-else
- echo "\n\n### Starting consul agent"
- cd ./consul
- ./entry.sh &
-fi
-
-kube_url="http://${KUBE_MASTER_IP}:8080"
-
-if [ "${CLUSTER_TYPE}" == "openshift" ]; then
- kube_url="https://${KUBE_MASTER_IP}:8443"
-fi
-
-echo "\n\n### Starting kube2consul"
-if [ -z "${PDM_CONTROLLER_IP}" ]; then
- /bin/kube2consul --kube_master_url ${kube_url}
-else
- echo "in Paas mode."
- /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527
-fi \ No newline at end of file
diff --git a/kube2consul/pom.xml b/kube2consul/pom.xml
deleted file mode 100644
index 3ab8ce7..0000000
--- a/kube2consul/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
-
- <parent>
- <groupId>org.onap.oom.registrator</groupId>
- <artifactId>oom-registrator-parent</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </parent>
-
-
- <groupId>org.onap.oom.registrator</groupId>
- <artifactId>kube2consul</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <packaging>pom</packaging>
-
- <properties>
- <app.exec.name>kube2consul</app.exec.name>
- </properties>
-
- <build>
- <sourceDirectory>${basedir}${file.separator}src</sourceDirectory>
- <directory>${basedir}${file.separator}bin</directory>
- <plugins>
- <plugin>
- <groupId>com.igormaznitsa</groupId>
- <artifactId>mvn-golang-wrapper</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-resources-dockerfile</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${version.output}</outputDirectory>
- <includeEmptyDirs>true</includeEmptyDirs>
- <resources>
- <resource>
- <directory>${dockerFileDir}</directory>
- <filtering>false</filtering>
- <includes>
- <include>**/*</include>
- </includes>
- </resource>
- </resources>
- <overwrite>true</overwrite>
- </configuration>
- </execution>
- <execution>
- <id>copy-resources-blueprint</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${version.output}/blueprint</outputDirectory>
- <includeEmptyDirs>true</includeEmptyDirs>
- <resources>
- <resource>
- <directory>${blueprintFileDir}</directory>
- <filtering>false</filtering>
- <includes>
- <include>**/*</include>
- </includes>
- </resource>
- </resources>
- <overwrite>true</overwrite>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copytolinux64</id>
- <goals>
- <goal>copy</goal>
- </goals>
- <phase>prepare-package</phase>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>com.zte.ums.zenap.msb.components</groupId>
- <artifactId>consul</artifactId>
- <type>tar.gz</type>
- <classifier>linux_amd64</classifier>
- </artifactItem>
- </artifactItems>
- <outputDirectory>${version.output}</outputDirectory>
- <overWriteReleases>false</overWriteReleases>
- <overWriteSnapshots>true</overWriteSnapshots>
- <stripVersion>true</stripVersion>
- <outputAbsoluteArtifactFilename>true</outputAbsoluteArtifactFilename>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project> \ No newline at end of file
diff --git a/kube2consul/src/kube2consul/Makefile b/kube2consul/src/kube2consul/Makefile
deleted file mode 100644
index 141abb0..0000000
--- a/kube2consul/src/kube2consul/Makefile
+++ /dev/null
@@ -1,11 +0,0 @@
-.PHONY: kube2consul clean test
-
-kube2consul: kube2consul.go
- CGO_ENABLED=0 go build --ldflags '-extldflags "-static"'
- strip kube2consul
-
-clean:
- rm -fr kube2consul
-
-test: clean
- go test -v --vmodule=*=4
diff --git a/kube2consul/src/kube2consul/consul_work.go b/kube2consul/src/kube2consul/consul_work.go
deleted file mode 100644
index 2e5927a..0000000
--- a/kube2consul/src/kube2consul/consul_work.go
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// consul_work.go
-package main
-
-import (
- "log"
- "strings"
- "sync"
-
- consulapi "github.com/hashicorp/consul/api"
- kapi "k8s.io/kubernetes/pkg/api"
-)
-
-type ConsulWorker interface {
- AddService(config DnsInfo, service *kapi.Service)
- RemoveService(config DnsInfo)
- AddPod(config DnsInfo, pod *kapi.Pod)
- RemovePod(config DnsInfo)
- SyncDNS()
-}
-
-type ConsulAgentWorker struct {
- sync.Mutex
- ConsulWorker
- ids map[string][]*consulapi.AgentServiceRegistration
- agent *consulapi.Client
-}
-
-func newConsulAgentWorker(client *consulapi.Client) *ConsulAgentWorker {
- return &ConsulAgentWorker{
- agent: client,
- ids: make(map[string][]*consulapi.AgentServiceRegistration),
- }
-}
-
-func (client *ConsulAgentWorker) AddService(config DnsInfo, service *kapi.Service) {
- client.Lock()
- defer client.Unlock()
- log.Println("Starting Add Service for: ", config.BaseID)
-
- if config.IPAddress == "" || config.BaseID == "" {
- log.Println("Service Info is not valid for AddService")
- return
- }
-
- annos := service.ObjectMeta.Annotations
- //only register service with annotations
- if len(annos) == 0 {
- log.Println("no Annotation defined for this service:", service.Name)
- return
- }
-
- for _, port := range service.Spec.Ports {
- kubeService := newKubeService()
- kubeService.BuildServiceInfoMap(&port, &annos)
-
- if len(kubeService.sInfos) == 0 {
- log.Println("no Service defined for this port:", port.Port)
- continue
- }
-
- //remove service with no name
- for id, s := range kubeService.sInfos {
- if len(s.Name) == 0 {
- if id == "0" {
- log.Printf("SERVICE_%d_NAME not set, ignore this service.", port.Port)
- } else {
- log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", port.Port, id)
- }
- delete(kubeService.sInfos, id)
- }
- }
-
- //test again
- if len(kubeService.sInfos) == 0 {
- log.Println("no Service defined for this port:", port.Port)
- continue
- }
-
- for _, s := range kubeService.sInfos {
- asr := kubeService.BuildAgentService(s, config, &port)
- if *argChecks && port.Protocol == "TCP" && config.ServiceType != kapi.ServiceTypeClusterIP {
- //Create Check if neeeded
- asr.Check = kubeService.BuildAgentServiceCheck(config, &port)
- }
-
- if client.agent != nil {
- //Registers with DNS
- if err := client.agent.Agent().ServiceRegister(asr); err != nil {
- log.Println("Error creating service record: ", asr.ID)
- }
- }
-
- //Add to IDS
- client.ids[config.BaseID] = append(client.ids[config.BaseID], asr)
- }
- }
- log.Println("Added Service: ", service.Name)
-}
-
-func (client *ConsulAgentWorker) RemoveService(config DnsInfo) {
- client.Lock()
- defer client.Unlock()
- if ids, ok := client.ids[config.BaseID]; ok {
- for _, asr := range ids {
- if client.agent != nil {
- if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil {
- log.Println("Error removing service: ", err)
- }
- }
- }
- delete(client.ids, config.BaseID)
- } else {
- log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID)
- }
-}
-
-func (client *ConsulAgentWorker) AddPod(config DnsInfo, pod *kapi.Pod) {
- client.Lock()
- defer client.Unlock()
- log.Println("Starting Add Pod for: ", config.BaseID)
-
- //double check if have Pod IPs so far in Paas mode
- if *argPdmControllerUrl != "" {
- if _, ok := pdmPodIPsMap[pod.Name]; !ok {
- log.Println("In Paas mode, We don't have Pod IPs to register with for Pod: ", pod.Name)
- return
- }
- }
-
- containerIdMap := make(map[string]string)
- buildContainerIdMap(pod, &containerIdMap)
-
- for _, c := range pod.Spec.Containers {
-
- for _, p := range c.Ports {
- podService := newPodService()
- podService.BuildServiceInfoMap(&p, &c)
- if len(podService.sInfos) == 0 {
- log.Println("no Service defined for this port:", p.ContainerPort)
- continue
- }
-
- //remove service with no name
- for id, s := range podService.sInfos {
- if len(s.Name) == 0 {
- if id == "0" {
- log.Printf("SERVICE_%d_NAME not set, ignore this service.", p.ContainerPort)
- } else {
- log.Printf("SERVICE_%d_NAME_%s not set, ignore this service.", p.ContainerPort, id)
- }
- delete(podService.sInfos, id)
- }
- }
-
- //remove service if same service exist with different protocol
- for id, s := range podService.sInfos {
- services := []*consulapi.CatalogService{}
- sn := s.Name
- //append namespace if specified
- if len(s.NS) != 0 {
- sn = sn + "-" + s.NS
- }
- var tags []string
- var protocol string
- tags = strings.Split(s.Tags, ",")
- for _, v := range tags {
- var elems []string
- elems = strings.Split(v, ":")
- if elems[0] == PROTOCOL {
- protocol = elems[1]
- break
- }
- }
- //remove service with empty protocol
- if protocol == "" {
- delete(podService.sInfos, id)
- continue
- }
-
- protocol_field := "\"protocol\":\"" + protocol + "\""
-
- //query consul
- if client.agent != nil {
- same_protocol := false
- services, _, _ = client.agent.Catalog().Service(sn, "", nil)
- if services == nil || len(services) == 0 {
- continue
- }
- for _, v := range services[0].ServiceTags {
- if strings.Contains(v, protocol_field) {
- same_protocol = true
- break
- }
- }
-
- if !same_protocol {
- log.Printf("same service with different protocol already exist, ignore service:%s, protocol:%s", sn, protocol)
- delete(podService.sInfos, id)
- }
- }
- }
-
- //remove service with no network plane type in tags
- if *argPdmControllerUrl != "" {
- for id, s := range podService.sInfos {
- if len(s.Tags) == 0 {
- if id == "0" {
- log.Printf("SERVICE_%d_TAGS not set, ignore this service in Paas mode.", p.ContainerPort)
- } else {
- log.Printf("SERVICE_%d_TAGS_%s not set, ignore this service in Paas mode.", p.ContainerPort, id)
- }
- delete(podService.sInfos, id)
- } else {
- var tags []string
- tags = strings.Split(s.Tags, ",")
- for _, v := range tags {
- var elems []string
- elems = strings.Split(v, ":")
- //this is saved in Tags, later will be moved to Labels??
- if elems[0] == NETWORK_PLANE_TYPE {
- types := strings.Split(elems[1], "|")
- for _, t := range types {
- ip := getIPFromPodIPsMap(t, pod.Name)
- if ip == "" {
- log.Printf("User defined network type:%s has no assigned ip for Pod:%s", t, pod.Name)
- } else {
- log.Printf("Found ip:%s for network type:%s", ip, t)
- s.IPs = append(s.IPs, PodIP{
- NetworkPlane: t,
- IPAddress: ip,
- })
- }
- }
- }
- }
-
- if len(s.IPs) == 0 {
- log.Printf("In Paas mode, no IP assigned for Pod:ContainerPort->%s:%d", pod.Name, p.ContainerPort)
- delete(podService.sInfos, id)
- }
- }
- }
- }
-
- //test again
- if len(podService.sInfos) == 0 {
- log.Println("no Service defined for this port:", p.ContainerPort)
- continue
- }
-
- for _, s := range podService.sInfos {
- asrs := podService.BuildAgentService(s, config, &p, containerIdMap[c.Name])
- if client.agent != nil {
- for _, asr := range asrs {
- //Registers with DNS
- if err := client.agent.Agent().ServiceRegister(asr); err != nil {
- log.Println("Error creating service record: ", asr.ID)
- } else {
- log.Printf("Added service:instance ->%s:%s", asr.Name, asr.ID)
- }
- client.ids[config.BaseID] = append(client.ids[config.BaseID], asr)
- }
- } else {
- log.Println("Consul client is not available.")
- }
- }
- }
- }
- log.Println("Added Pod: ", pod.Name)
-}
-
-func (client *ConsulAgentWorker) RemovePod(config DnsInfo) {
- client.Lock()
- defer client.Unlock()
- if ids, ok := client.ids[config.BaseID]; ok {
- for _, asr := range ids {
- if client.agent != nil {
- if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil {
- log.Println("Error removing service: ", err)
- } else {
- log.Printf("removed service -> %s:%d", asr.Name, asr.Port)
- }
- }
- }
- delete(client.ids, config.BaseID)
- log.Println("Removed Pod: ", config.BaseID)
- } else {
- log.Println("Requested to remove non-existant BaseID DNS of:", config.BaseID)
- }
-}
-
-func (client *ConsulAgentWorker) SyncDNS() {
- if client.agent != nil {
- if services, err := client.agent.Agent().Services(); err == nil {
- for _, registered := range client.ids {
- for _, service := range registered {
- if !containsServiceId(service.ID, services) {
- log.Println("Regregistering missing service ID: ", service.ID)
- client.agent.Agent().ServiceRegister(service)
- }
- }
- }
- for _, service := range services {
- found := false
- for _, registered := range client.ids {
- for _, svc := range registered {
- if service.ID == svc.ID {
- found = true
- break
- }
- }
- if found {
- break
- }
- }
- if !found {
- log.Println("Degregistering dead service ID: ", service.ID)
- client.agent.Agent().ServiceDeregister(service.ID)
- }
- }
- } else {
- log.Println("Error retreiving services from consul during sync: ", err)
- }
- }
-}
-
-func buildContainerIdMap(pod *kapi.Pod, cmap *map[string]string) {
- for _, s := range pod.Status.ContainerStatuses {
- id := strings.TrimLeft(s.ContainerID, "docker://")
- (*cmap)[s.Name] = id
- }
-}
-
-func containsServiceId(id string, services map[string]*consulapi.AgentService) bool {
- for _, service := range services {
- if service.ID == id {
- return true
- }
- }
- return false
-}
-
-func getIPFromPodIPsMap(ptype string, podname string) string {
- ips := pdmPodIPsMap[podname]
- for _, v := range ips {
- if v.NetworkPlane == ptype {
- return v.IPAddress
- }
- }
- return ""
-}
diff --git a/kube2consul/src/kube2consul/kube2consul.go b/kube2consul/src/kube2consul/kube2consul.go
deleted file mode 100644
index f1213f2..0000000
--- a/kube2consul/src/kube2consul/kube2consul.go
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// kube2consul.go
-package main
-
-import (
- "flag"
- "fmt"
- "log"
- "net/url"
- "os"
- "reflect"
- "time"
-
- consulapi "github.com/hashicorp/consul/api"
- kapi "k8s.io/kubernetes/pkg/api"
- kcache "k8s.io/kubernetes/pkg/client/cache"
- kclient "k8s.io/kubernetes/pkg/client/unversioned"
- kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
- kframework "k8s.io/kubernetes/pkg/controller/framework"
- kselector "k8s.io/kubernetes/pkg/fields"
- klabels "k8s.io/kubernetes/pkg/labels"
-)
-
-var (
- argConsulAgent = flag.String("consul-agent", "http://127.0.0.1:8500", "URL to consul agent")
- argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.")
- argChecks = flag.Bool("checks", false, "Adds TCP service checks for each TCP Service")
- argPdmControllerUrl = flag.String("pdm_controller_url", "", "URL to pdm controller")
- addMap = make(map[string]*kapi.Pod)
- deleteMap = make(map[string]*kapi.Pod)
- pdmPodIPsMap = make(map[string][]PodIP)
- nodeSelector = klabels.Everything()
-)
-
-const (
- // Maximum number of attempts to connect to consul server.
- maxConnectAttempts = 12
- // Resync period for the kube controller loop.
- resyncPeriod = 5 * time.Second
- // Default Health check interval
- DefaultInterval = "10s"
- // Resource url to query pod from PDM controller
- podUrl = "nw/v1/tenants"
-)
-
-const (
- Service_NAME = "NAME"
- Service_TAGS = "TAGS"
- Service_LABELS = "LABELS"
- Service_MDATA = "MDATA"
- Service_NS = "NAMESPACE"
- Service_TTL = "TTL"
- Service_HTTP = "HTTP"
- Service_TCP = "TCP"
- Service_CMD = "CMD"
- Service_SCRIPT = "SCRIPT"
- Service_TIMEOUT = "TIMEOUT"
- Service_INTERVAL = "INTERVAL"
-)
-
-const (
- Table_BASE = "base"
- Table_LB = "lb"
- Table_LABELS = "labels"
- Table_META_DATA = "metadata"
- Table_NS = "ns"
- Table_CHECKS = "checks"
-)
-
-const (
- PROTOCOL = "protocol"
- PROTOCOL_UI = "UI"
- PREFIX_UI = "IUI_"
-)
-
-const (
- //filter out from the tags for compatibility
- NETWORK_PLANE_TYPE = "network_plane_type"
- VISUAL_RANGE = "visualRange"
- LB_POLICY = "lb_policy"
- LB_SVR_PARAMS = "lb_server_params"
-)
-
-const DefaultLabels = "\"labels\":{\"visualRange\":\"1\"}"
-
-func getKubeMasterUrl() (string, error) {
- if *argKubeMasterUrl == "" {
- return "", fmt.Errorf("no --kube_master_url specified")
- }
- parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl))
- if err != nil {
- return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err)
- }
- if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
- return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl)
- }
- return parsedUrl.String(), nil
-}
-
-func newConsulClient(consulAgent string) (*consulapi.Client, error) {
- var (
- client *consulapi.Client
- err error
- )
-
- consulConfig := consulapi.DefaultConfig()
- consulAgentUrl, err := url.Parse(consulAgent)
- if err != nil {
- log.Println("Error parsing Consul url")
- return nil, err
- }
-
- if consulAgentUrl.Host != "" {
- consulConfig.Address = consulAgentUrl.Host
- }
-
- if consulAgentUrl.Scheme != "" {
- consulConfig.Scheme = consulAgentUrl.Scheme
- }
-
- client, err = consulapi.NewClient(consulConfig)
- if err != nil {
- log.Println("Error creating Consul client")
- return nil, err
- }
-
- for attempt := 1; attempt <= maxConnectAttempts; attempt++ {
- if _, err = client.Agent().Self(); err == nil {
- break
- }
-
- if attempt == maxConnectAttempts {
- break
- }
-
- log.Printf("[Attempt: %d] Attempting access to Consul after 5 second sleep", attempt)
- time.Sleep(5 * time.Second)
- }
-
- if err != nil {
- return nil, fmt.Errorf("failed to connect to Consul agent: %v, error: %v", consulAgent, err)
- }
- log.Printf("Consul agent found: %v", consulAgent)
-
- return client, nil
-}
-
-func newKubeClient() (*kclient.Client, error) {
- masterUrl, err := getKubeMasterUrl()
- if err != nil {
- return nil, err
- }
- overrides := &kclientcmd.ConfigOverrides{}
- overrides.ClusterInfo.Server = masterUrl
-
- rules := kclientcmd.NewDefaultClientConfigLoadingRules()
- kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
-
- if err != nil {
- log.Println("Error creating Kube Config", err)
- return nil, err
- }
- kubeConfig.Host = masterUrl
-
- log.Printf("Using %s for kubernetes master", kubeConfig.Host)
- return kclient.New(kubeConfig)
-}
-
-func createNodeLW(kubeClient *kclient.Client) *kcache.ListWatch {
- return kcache.NewListWatchFromClient(kubeClient, "nodes", kapi.NamespaceAll, kselector.Everything())
-}
-
-func sendNodeWork(action KubeWorkAction, queue chan<- KubeWork, oldObject, newObject interface{}) {
- if node, ok := newObject.(*kapi.Node); ok {
- if nodeSelector.Matches(klabels.Set(node.Labels)) == false {
- log.Printf("Ignoring node %s due to label selectors", node.Name)
- return
- }
-
- log.Println("Node Action: ", action, " for node ", node.Name)
- work := KubeWork{
- Action: action,
- Node: node,
- }
-
- if action == KubeWorkUpdateNode {
- if oldNode, ok := oldObject.(*kapi.Node); ok {
- if nodeReady(node) != nodeReady(oldNode) {
- log.Println("Ready state change. Old:", nodeReady(oldNode), " New: ", nodeReady(node))
- queue <- work
- }
- }
- } else {
- queue <- work
- }
- }
-}
-
-func watchForNodes(kubeClient *kclient.Client, queue chan<- KubeWork) {
- _, nodeController := kframework.NewInformer(
- createNodeLW(kubeClient),
- &kapi.Node{},
- resyncPeriod,
- kframework.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- sendNodeWork(KubeWorkAddNode, queue, nil, obj)
- },
- DeleteFunc: func(obj interface{}) {
- sendNodeWork(KubeWorkRemoveNode, queue, nil, obj)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- sendNodeWork(KubeWorkUpdateNode, queue, oldObj, newObj)
- },
- },
- )
- stop := make(chan struct{})
- go nodeController.Run(stop)
-}
-
-// Returns a cache.ListWatch that gets all changes to services.
-func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
- return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything())
-}
-
-func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) {
- if service, ok := serviceObj.(*kapi.Service); ok {
- log.Println("Service Action: ", action, " for service ", service.Name)
- queue <- KubeWork{
- Action: action,
- Service: service,
- }
- }
-}
-
-func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) {
- _, svcController := kframework.NewInformer(
- createServiceLW(kubeClient),
- &kapi.Service{},
- resyncPeriod,
- kframework.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- sendServiceWork(KubeWorkAddService, queue, obj)
- },
- DeleteFunc: func(obj interface{}) {
- sendServiceWork(KubeWorkRemoveService, queue, obj)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- if reflect.DeepEqual(newObj, oldObj) == false {
- sendServiceWork(KubeWorkUpdateService, queue, newObj)
- }
- },
- },
- )
- stop := make(chan struct{})
- go svcController.Run(stop)
-}
-
-// Returns a cache.ListWatch that gets all changes to Pods.
-func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch {
- return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything())
-}
-
-// Dispatch the notifications for Pods by type to the worker
-func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) {
- if pod, ok := podObj.(*kapi.Pod); ok {
- log.Println("Pod Action: ", action, " for Pod:", pod.Name)
- queue <- KubeWork{
- Action: action,
- Pod: pod,
- }
- }
-}
-
-// Launch the go routine to watch notifications for Pods.
-func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) {
- var podController *kframework.Controller
- _, podController = kframework.NewInformer(
- createPodLW(kubeClient),
- &kapi.Pod{},
- resyncPeriod,
- kframework.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- sendPodWork(KubeWorkAddPod, queue, obj)
- },
- DeleteFunc: func(obj interface{}) {
- if o, ok := obj.(*kapi.Pod); ok {
- if _, ok := deleteMap[o.Name]; ok {
- delete(deleteMap, o.Name)
- }
- }
- sendPodWork(KubeWorkRemovePod, queue, obj)
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod)
- if reflect.DeepEqual(oldObj, newObj) == false {
- //Adding Pod
- if _, ok := addMap[n.Name]; ok {
- if kapi.IsPodReady(n) {
- delete(addMap, n.Name)
- sendPodWork(KubeWorkUpdatePod, queue, newObj)
- }
- return
- }
- //Deleting Pod
- if _, ok := deleteMap[n.Name]; ok {
- return
- } else {
- if o.ObjectMeta.DeletionTimestamp == nil &&
- n.ObjectMeta.DeletionTimestamp != nil {
- deleteMap[n.Name] = n
- return
- }
- //Updating Pod
- sendPodWork(KubeWorkUpdatePod, queue, newObj)
- }
- }
- },
- },
- )
- stop := make(chan struct{})
- go podController.Run(stop)
-}
-
-func runBookKeeper(workQue <-chan KubeWork, consulQueue chan<- ConsulWork, apiClient *kclient.Client) {
-
- client := newClientBookKeeper(apiClient)
- client.consulQueue = consulQueue
-
- for work := range workQue {
- switch work.Action {
- case KubeWorkAddNode:
- client.AddNode(work.Node)
- case KubeWorkRemoveNode:
- client.RemoveNode(work.Node.Name)
- case KubeWorkUpdateNode:
- client.UpdateNode(work.Node)
- case KubeWorkAddService:
- client.AddService(work.Service)
- case KubeWorkRemoveService:
- client.RemoveService(work.Service)
- case KubeWorkUpdateService:
- client.UpdateService(work.Service)
- case KubeWorkAddPod:
- client.AddPod(work.Pod)
- case KubeWorkRemovePod:
- client.RemovePod(work.Pod)
- case KubeWorkUpdatePod:
- client.UpdatePod(work.Pod)
- case KubeWorkSync:
- client.Sync()
- default:
- log.Println("Unsupported work action: ", work.Action)
- }
- }
- log.Println("Completed all work")
-}
-
-func runConsulWorker(queue <-chan ConsulWork, client *consulapi.Client) {
- worker := newConsulAgentWorker(client)
-
- for work := range queue {
- log.Println("Consul Work Action: ", work.Action, " BaseID:", work.Config.BaseID)
-
- switch work.Action {
- case ConsulWorkAddService:
- worker.AddService(work.Config, work.Service)
- case ConsulWorkRemoveService:
- worker.RemoveService(work.Config)
- case ConsulWorkAddPod:
- worker.AddPod(work.Config, work.Pod)
- case ConsulWorkRemovePod:
- worker.RemovePod(work.Config)
- case ConsulWorkSyncDNS:
- worker.SyncDNS()
- default:
- log.Println("Unsupported Action of: ", work.Action)
- }
-
- }
-}
-
-func main() {
- flag.Parse()
- // TODO: Validate input flags.
- var err error
- var consulClient *consulapi.Client
-
- if consulClient, err = newConsulClient(*argConsulAgent); err != nil {
- log.Fatalf("Failed to create Consul client - %v", err)
- }
-
- kubeClient, err := newKubeClient()
- if err != nil {
- log.Fatalf("Failed to create a kubernetes client: %v", err)
- }
-
- if _, err := kubeClient.ServerVersion(); err != nil {
- log.Fatal("Could not connect to Kube Master", err)
- } else {
- log.Println("Connected to K8S API Server")
- }
-
- kubeWorkQueue := make(chan KubeWork)
- consulWorkQueue := make(chan ConsulWork)
- go runBookKeeper(kubeWorkQueue, consulWorkQueue, kubeClient)
- watchForNodes(kubeClient, kubeWorkQueue)
- watchForServices(kubeClient, kubeWorkQueue)
- watchForPods(kubeClient, kubeWorkQueue)
- go runConsulWorker(consulWorkQueue, consulClient)
-
- log.Println("Running Consul Sync loop every: 60 seconds")
- csyncer := time.NewTicker(time.Second * time.Duration(60))
- go func() {
- for t := range csyncer.C {
- log.Println("Consul Sync request at:", t)
- consulWorkQueue <- ConsulWork{
- Action: ConsulWorkSyncDNS,
- }
- }
- }()
-
- log.Println("Running Kube Sync loop every: 60 seconds")
- ksyncer := time.NewTicker(time.Second * time.Duration(60))
- go func() {
- for t := range ksyncer.C {
- log.Println("Kube Sync request at:", t)
- kubeWorkQueue <- KubeWork{
- Action: KubeWorkSync,
- }
- }
- }()
-
- // Prevent exit
- select {}
-}
diff --git a/kube2consul/src/kube2consul/kube_service.go b/kube2consul/src/kube2consul/kube_service.go
deleted file mode 100644
index 887f6c8..0000000
--- a/kube2consul/src/kube2consul/kube_service.go
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// kube_service.go
-package main
-
-import (
- "log"
- "strconv"
- "strings"
-
- consulapi "github.com/hashicorp/consul/api"
- kapi "k8s.io/kubernetes/pkg/api"
-)
-
-type KubeServiceAction interface {
- BuildServiceInfoMap(*kapi.ServicePort, *map[string]string)
- BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceRegistration
- BuildAgentServiceCheck(DnsInfo, *kapi.ServicePort) *consulapi.AgentServiceCheck
-}
-
-type KubeService struct {
- KubeServiceAction
- sInfos map[string]*ServiceInfo
-}
-
-func newKubeService() *KubeService {
- return &KubeService{
- sInfos: make(map[string]*ServiceInfo),
- }
-}
-
-func (kube *KubeService) BuildServiceInfoMap(sport *kapi.ServicePort, annos *map[string]string) {
- portstr := strconv.Itoa(int(sport.Port))
- nameprefix := "SERVICE_" + portstr + "_NAME"
- tagprefix := "SERVICE_" + portstr + "_TAGS"
-
- for name, value := range *annos {
- if strings.HasPrefix(name, nameprefix) {
- addToServiceInfoMap(name, value, Service_NAME, &kube.sInfos)
- } else if strings.HasPrefix(name, tagprefix) {
- addToServiceInfoMap(name, value, Service_TAGS, &kube.sInfos)
- }
- }
-}
-
-func (kube *KubeService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceRegistration {
- name := sInfo.Name
- var tagslice []string
- tagslice = strings.Split(sInfo.Tags, ",")
-
- for _, elem := range tagslice {
- var elemslice []string
- elemslice = strings.Split(elem, ":")
- if elemslice[0] == PROTOCOL {
- switch elemslice[1] {
- case PROTOCOL_UI:
- name = PREFIX_UI + name
- default:
- log.Println("regular protocol:", elemslice[1])
- }
- break
- }
- }
-
- asrID := config.BaseID + "-" + strconv.Itoa(int(sport.Port)) + "-" + name
-
- regPort := sport.NodePort
- if config.ServiceType == kapi.ServiceTypeClusterIP {
- regPort = sport.Port
- }
-
- return &consulapi.AgentServiceRegistration{
- ID: asrID,
- Name: name,
- Address: config.IPAddress,
- Port: int(regPort),
- Tags: tagslice,
- }
-}
-
-func (kube *KubeService) BuildAgentServiceCheck(config DnsInfo, sport *kapi.ServicePort) *consulapi.AgentServiceCheck {
- log.Println("Creating service check for: ", config.IPAddress, " on Port: ", sport.NodePort)
- return &consulapi.AgentServiceCheck{
- TCP: config.IPAddress + ":" + strconv.Itoa(int(sport.NodePort)),
- Interval: "60s",
- }
-}
diff --git a/kube2consul/src/kube2consul/kube_work.go b/kube2consul/src/kube2consul/kube_work.go
deleted file mode 100644
index 4695a76..0000000
--- a/kube2consul/src/kube2consul/kube_work.go
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// kube_work.go
-package main
-
-import (
- "encoding/json"
- "kube2consul/util/restclient"
- "log"
- "strings"
- "sync"
- "time"
-
- kapi "k8s.io/kubernetes/pkg/api"
- kclient "k8s.io/kubernetes/pkg/client/unversioned"
- kselector "k8s.io/kubernetes/pkg/fields"
- klabels "k8s.io/kubernetes/pkg/labels"
-)
-
-type KubeBookKeeper interface {
- AddNode(*kapi.Node)
- RemoveNode(*kapi.Node)
- UpdateNode(*kapi.Node)
- AddService(*kapi.Service)
- RemoveService(*kapi.Service)
- UpdateService(*kapi.Service)
- AddPod(*kapi.Pod)
- RemovePod(*kapi.Pod)
- UpdatePod(*kapi.Pod)
-}
-
-type ClientBookKeeper struct {
- sync.Mutex
- KubeBookKeeper
- client *kclient.Client
- nodes map[string]*KubeNode
- services map[string]*kapi.Service
- pods map[string]*kapi.Pod
- consulQueue chan<- ConsulWork
-}
-
-type KubeNode struct {
- name string
- readyStatus bool
- serviceIDS map[string]string
- address string
-}
-
-func buildServiceBaseID(nodeName string, service *kapi.Service) string {
- return nodeName + "-" + service.Name
-}
-
-func nodeReady(node *kapi.Node) bool {
- for i := range node.Status.Conditions {
- if node.Status.Conditions[i].Type == kapi.NodeReady {
- return node.Status.Conditions[i].Status == kapi.ConditionTrue
- }
- }
-
- log.Println("NodeReady condition is missing from node: ", node.Name)
- return false
-}
-
-func newKubeNode() *KubeNode {
- return &KubeNode{
- name: "",
- readyStatus: false,
- serviceIDS: make(map[string]string),
- }
-}
-
-func newClientBookKeeper(client *kclient.Client) *ClientBookKeeper {
- return &ClientBookKeeper{
- client: client,
- nodes: make(map[string]*KubeNode),
- services: make(map[string]*kapi.Service),
- pods: make(map[string]*kapi.Pod),
- }
-}
-
-func (client *ClientBookKeeper) AddNode(node *kapi.Node) {
- client.Lock()
- defer client.Unlock()
- if _, ok := client.nodes[node.Name]; ok {
- log.Printf("Node:%s already exist. skip this ADD notification.", node.Name)
- return
- }
- //Add to Node Collection
- kubeNode := newKubeNode()
- kubeNode.readyStatus = nodeReady(node)
- kubeNode.name = node.Name
- kubeNode.address = node.Status.Addresses[0].Address
-
- //Send request for Service Addition for node and all serviceIDS (Create Service ID here)
- if kubeNode.readyStatus {
- client.AddAllServicesToNode(kubeNode)
- }
-
- client.nodes[node.Name] = kubeNode
- log.Println("Added Node: ", node.Name)
-}
-
-func (client *ClientBookKeeper) AddAllServicesToNode(node *KubeNode) {
- for _, service := range client.services {
- if service.Spec.Type == kapi.ServiceTypeClusterIP {
- log.Println("ClusterIP service ignored for node binding:", service.Name)
- } else {
- client.AttachServiceToNode(node, service)
- }
- }
-}
-
-func (client *ClientBookKeeper) AttachServiceToNode(node *KubeNode, service *kapi.Service) {
- baseID := buildServiceBaseID(node.name, service)
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkAddService,
- Service: service,
- Config: DnsInfo{
- BaseID: baseID,
- IPAddress: node.address,
- ServiceType: service.Spec.Type,
- },
- }
- log.Println("Requesting Addition of services with Base ID: ", baseID)
- node.serviceIDS[service.Name] = baseID
-}
-
-func (client *ClientBookKeeper) RemoveNode(name string) {
- client.Lock()
- defer client.Unlock()
- if kubeNode, ok := client.nodes[name]; ok {
- //Remove All DNS for node
- client.RemoveAllServicesFromNode(kubeNode)
- //Remove Node from Collection
- delete(client.nodes, name)
- } else {
- log.Println("Attmepted to remove missing node: ", name)
- }
-
- log.Println("Queued Node to be removed: ", name)
-}
-
-func (client *ClientBookKeeper) RemoveAllServicesFromNode(node *KubeNode) {
- for _, service := range client.services {
- if service.Spec.Type == kapi.ServiceTypeClusterIP {
- log.Println("ClusterIP service ignored for node unbinding:", service.Name)
- } else {
- client.DetachServiceFromNode(node, service)
- }
- }
-}
-
-func (client *ClientBookKeeper) DetachServiceFromNode(node *KubeNode, service *kapi.Service) {
- if baseID, ok := node.serviceIDS[service.Name]; ok {
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkRemoveService,
- Config: DnsInfo{
- BaseID: baseID,
- },
- }
-
- log.Println("Requesting Removal of services with Base ID: ", baseID)
- delete(node.serviceIDS, service.Name)
- }
-}
-
-func (client *ClientBookKeeper) UpdateNode(node *kapi.Node) {
- if nodeReady(node) {
- // notReady nodes also stored in client.nodes
- client.AddAllServicesToNode(client.nodes[node.Name])
- } else {
- client.RemoveAllServicesFromNode(client.nodes[node.Name])
- }
-}
-
-func (client *ClientBookKeeper) AddService(svc *kapi.Service) {
- client.Lock()
- defer client.Unlock()
- if !isUserDefinedService(svc) {
- log.Println("Not the target, skip this ADD notification for service:", svc.Name)
- return
- }
-
- if _, ok := client.services[svc.Name]; ok {
- log.Printf("service:%s already exist. skip this ADD notification.", svc.Name)
- return
- }
-
- if kapi.IsServiceIPSet(svc) {
- if svc.Spec.Type == kapi.ServiceTypeClusterIP {
- log.Println("Adding ClusterIP service:", svc.Name)
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkAddService,
- Service: svc,
- Config: DnsInfo{
- BaseID: svc.Name,
- IPAddress: svc.Spec.ClusterIP,
- ServiceType: svc.Spec.Type,
- },
- }
- } else {
- //Check and use local node list first
- if len(client.nodes) > 0 {
- for _, kubeNode := range client.nodes {
- if kubeNode.readyStatus {
- client.AttachServiceToNode(kubeNode, svc)
- }
- }
- } else {
- log.Println("local node list is empty, retrieve it from the api server.")
- nodes := client.client.Nodes()
- if nodeList, err := nodes.List(kapi.ListOptions{
- LabelSelector: klabels.Everything(),
- FieldSelector: kselector.Everything(),
- }); err == nil {
- for _, node := range nodeList.Items {
- if nodeReady(&node) {
- kubeNode := newKubeNode()
- kubeNode.readyStatus = nodeReady(&node)
- kubeNode.name = node.Name
- kubeNode.address = node.Status.Addresses[0].Address
- client.AttachServiceToNode(kubeNode, svc)
- }
- }
- }
- }
- }
- client.services[svc.Name] = svc
- log.Println("Queued Service to be added: ", svc.Name)
- } else {
- // if ClusterIP is not set, do not create a DNS records
- log.Printf("Skipping dns record for headless service: %s\n", svc.Name)
- }
-}
-
-func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) {
- client.Lock()
- defer client.Unlock()
- if !isUserDefinedService(svc) {
- log.Println("Not the target, skip this Remove notification for service:", svc.Name)
- return
- }
-
- if _, ok := client.services[svc.Name]; !ok {
- log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name)
- return
- }
-
- if svc.Spec.Type == kapi.ServiceTypeClusterIP {
- log.Println("Removing ClusterIP service:", svc.Name)
- //Perform All DNS Removes
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkRemoveService,
- Config: DnsInfo{
- BaseID: svc.Name,
- },
- }
- } else {
- //Check and use local node list first
- if len(client.nodes) > 0 {
- for _, kubeNode := range client.nodes {
- if kubeNode.readyStatus {
- client.DetachServiceFromNode(kubeNode, svc)
- }
- }
- } else {
- log.Println("local node list is empty, retrieve it from the api server. sync it later.")
- }
- }
- delete(client.services, svc.Name)
- log.Println("Queued Service to be removed: ", svc.Name)
-}
-
-func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) {
- if !isUserDefinedService(svc) {
- log.Println("Not the target, skip this Update notification for service:", svc.Name)
- return
- }
-
- client.RemoveService(svc)
- client.AddService(svc)
-}
-
-func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) {
- client.Lock()
- defer client.Unlock()
- if !isUserDefinedPod(pod) {
- log.Println("Not the target, skip this ADD notification for pod:", pod.Name)
- return
- }
-
- if _, ok := client.pods[pod.Name]; ok {
- log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name)
- return
- }
-
- //newly added Pod
- if pod.Name == "" || pod.Status.PodIP == "" {
- log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name)
- addMap[pod.Name] = pod
- return
- }
- //existing Pod
- if kapi.IsPodReady(pod) {
- if *argPdmControllerUrl != "" {
- fillPdmPodIPsMap(pod)
- }
- }
- //Perform All DNS Adds
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkAddPod,
- Pod: pod,
- Config: DnsInfo{
- BaseID: pod.Name,
- IPAddress: pod.Status.PodIP,
- },
- }
- client.pods[pod.Name] = pod
- log.Println("Queued Pod to be added: ", pod.Name)
-}
-
-func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
- client.Lock()
- defer client.Unlock()
- if !isUserDefinedPod(pod) {
- log.Println("Not the target, skip this Remove notification for pod:", pod.Name)
- return
- }
-
- if _, ok := client.pods[pod.Name]; !ok {
- log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name)
- return
- }
- //Perform All DNS Removes
- client.consulQueue <- ConsulWork{
- Action: ConsulWorkRemovePod,
- Config: DnsInfo{
- BaseID: pod.Name,
- },
- }
- delete(client.pods, pod.Name)
- log.Println("Queued Pod to be removed: ", pod.Name)
-}
-
-func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) {
- if !isUserDefinedPod(pod) {
- log.Println("Not the target, skip this Update notification for pod:", pod.Name)
- return
- }
-
- if *argPdmControllerUrl != "" {
- fillPdmPodIPsMap(pod)
- }
- client.RemovePod(pod)
- client.AddPod(pod)
-}
-
-func (client *ClientBookKeeper) Sync() {
- nodes := client.client.Nodes()
- if nodeList, err := nodes.List(kapi.ListOptions{
- LabelSelector: klabels.Everything(),
- FieldSelector: kselector.Everything(),
- }); err == nil {
- for name, _ := range client.nodes {
- if !containsNodeName(name, nodeList) {
- log.Printf("Bookkeeper has node: %s that does not exist in api server", name)
- client.RemoveNode(name)
- }
- }
- }
-
- svcs := client.client.Services(kapi.NamespaceAll)
- if svcList, err := svcs.List(kapi.ListOptions{
- LabelSelector: klabels.Everything(),
- FieldSelector: kselector.Everything(),
- }); err == nil {
- for name, svc := range client.services {
- if !containsSvcName(name, svcList) {
- log.Printf("Bookkeeper has service: %s that does not exist in api server", name)
- client.RemoveService(svc)
- }
- }
- }
-
- pods := client.client.Pods(kapi.NamespaceAll)
- if podList, err := pods.List(kapi.ListOptions{
- LabelSelector: klabels.Everything(),
- FieldSelector: kselector.Everything(),
- }); err == nil {
- for name, pod := range client.pods {
- if !containsPodName(name, podList) {
- log.Printf("Bookkeeper has pod: %s that does not exist in api server", name)
- if _, ok := deleteMap[pod.Name]; ok {
- log.Println("Missing remove notification for Pod: ", pod.Name)
- delete(deleteMap, pod.Name)
- }
- client.RemovePod(pod)
- }
- }
- //It was observed that the kube notification may lost after the restarting of kube master.
- //Currently this is only done for Pod. same for Node and Service will be done when required.
- for _, pod := range podList.Items {
- if _, ok := client.pods[pod.ObjectMeta.Name]; !ok {
- if kapi.IsPodReady(&pod) {
- log.Println("Missing add/update notification for Pod: ", pod.ObjectMeta.Name)
- client.AddPod(&pod)
- }
- }
- }
- }
-}
-
-func containsPodName(name string, pods *kapi.PodList) bool {
- for _, pod := range pods.Items {
- if pod.ObjectMeta.Name == name {
- return true
- }
- }
- return false
-}
-
-func containsSvcName(name string, svcs *kapi.ServiceList) bool {
- for _, svc := range svcs.Items {
- if svc.ObjectMeta.Name == name {
- return true
- }
- }
- return false
-}
-
-func containsNodeName(name string, nodes *kapi.NodeList) bool {
- for _, node := range nodes.Items {
- if node.ObjectMeta.Name == name {
- return true
- }
- }
- return false
-}
-
-func isUserDefinedPod(pod *kapi.Pod) bool {
- for _, c := range pod.Spec.Containers {
- for _, e := range c.Env {
- if strings.HasPrefix(e.Name, "SERVICE_") {
- if strings.Contains(e.Name, "_NAME") {
- return true
- }
- }
- }
- }
- return false
-}
-
-func isUserDefinedService(svc *kapi.Service) bool {
- for name, _ := range svc.ObjectMeta.Annotations {
- if strings.HasPrefix(name, "SERVICE_") {
- if strings.Contains(name, "_NAME") {
- return true
- }
- }
- }
- return false
-}
-
-func fillPdmPodIPsMap(pod *kapi.Pod) {
- base := *argPdmControllerUrl
- resUrl := podUrl + "/" + pod.Namespace + "/pods"
- rclient := restclient.NewRESTClient(base, resUrl, pod.Name)
- //try 10 times at maximum
- for i := 0; i < 10; i++ {
- log.Printf("try REST get the PDM PodIP for pod:%s at %d time..", pod.Name, i)
- buf, err := rclient.Get()
- if err != nil {
- log.Printf("request PDM PodIP info for Pod:%s with error:%v", pod.Name, err)
- time.Sleep(6 * 1e9) //sleep for 6 secs
- continue
- }
- podIPInfo := PdmPodIPInfo{}
- json.Unmarshal(buf, &podIPInfo)
- IPInfo := podIPInfo.Pod
- IPs := IPInfo.IPs
- if len(IPs) == 0 {
- log.Printf("request PDM PodIP info for Pod:%s return empty ip list.", pod.Name)
- return
- } else {
- for _, ip := range IPs {
- if ip.IPAddress == "" {
- log.Printf("request PDM PodIP info for Pod:%s return empty ip.", pod.Name)
- return
- }
- }
- pdmPodIPsMap[pod.Name] = IPs
- }
- log.Println("successfully REST get the PDM PodIP for pod:", pod.Name)
- return
- }
-
- log.Println("failed to REST get the PDM PodIP for pod:", pod.Name)
-}
diff --git a/kube2consul/src/kube2consul/pod_service.go b/kube2consul/src/kube2consul/pod_service.go
deleted file mode 100644
index c6294cb..0000000
--- a/kube2consul/src/kube2consul/pod_service.go
+++ /dev/null
@@ -1,677 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// pod_service.go
-package main
-
-import (
- "fmt"
- "log"
- "strconv"
- "strings"
-
- consulapi "github.com/hashicorp/consul/api"
- kapi "k8s.io/kubernetes/pkg/api"
-)
-
-type PodServiceAction interface {
- BuildServiceInfoMap(*kapi.ContainerPort, *kapi.Container)
- BuildAgentService(*ServiceInfo, DnsInfo, *kapi.ContainerPort, string) *consulapi.AgentServiceRegistration
-}
-
-type PodService struct {
- PodServiceAction
- sInfos map[string]*ServiceInfo
-}
-
-func newPodService() *PodService {
- return &PodService{
- sInfos: make(map[string]*ServiceInfo),
- }
-}
-
-func (pod *PodService) BuildServiceInfoMap(cport *kapi.ContainerPort, container *kapi.Container) {
- portstr := strconv.Itoa(int(cport.ContainerPort))
- fullname := "SERVICE_" + portstr + "_NAME"
- nameprefix := "SERVICE_" + portstr + "_NAME_"
- tagprefix := "SERVICE_" + portstr + "_TAGS"
- labelprefix := "SERVICE_" + portstr + "_ROUTE_LABELS"
- mdataprefix := "SERVICE_" + portstr + "_META_DATA"
- nsprefix := "SERVICE_" + portstr + "_NAMESPACE"
- ttlprefix := "SERVICE_" + portstr + "_CHECK_TTL"
- httpprefix := "SERVICE_" + portstr + "_CHECK_HTTP"
- tcpprefix := "SERVICE_" + portstr + "_CHECK_TCP"
- cmdprefix := "SERVICE_" + portstr + "_CHECK_CMD"
- scriptprefix := "SERVICE_" + portstr + "_CHECK_SCRIPT"
- timeoutprefix := "SERVICE_" + portstr + "_CHECK_TIMEOUT"
- intervalprefix := "SERVICE_" + portstr + "_CHECK_INTERVAL"
- for _, env := range container.Env {
- if env.Name == fullname || strings.HasPrefix(env.Name, nameprefix) {
- addToServiceInfoMap(env.Name, env.Value, Service_NAME, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, tagprefix) {
- addToServiceInfoMap(env.Name, env.Value, Service_TAGS, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, labelprefix) {
- addToServiceInfoMap(env.Name, env.Value, Service_LABELS, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, mdataprefix) {
- addToServiceInfoMap(env.Name, env.Value, Service_MDATA, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, nsprefix) {
- addToServiceInfoMap(env.Name, env.Value, Service_NS, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, ttlprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_TTL, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, httpprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_HTTP, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, tcpprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_TCP, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, cmdprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_CMD, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, scriptprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_SCRIPT, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, timeoutprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_TIMEOUT, &pod.sInfos)
- } else if strings.HasPrefix(env.Name, intervalprefix) && env.Value != "" {
- addToServiceInfoMap(env.Name, env.Value, Service_INTERVAL, &pod.sInfos)
- }
- }
-}
-
-func (pod *PodService) BuildAgentService(sInfo *ServiceInfo, config DnsInfo, cport *kapi.ContainerPort, cId string) []*consulapi.AgentServiceRegistration {
- asrs := []*consulapi.AgentServiceRegistration{}
- checks := []*consulapi.AgentServiceCheck{}
-
- //build Checks object and populate checks table
- var checks_in_tag string
-
- if len(sInfo.TTL) > 0 {
- check := new(consulapi.AgentServiceCheck)
- check.TTL = sInfo.TTL
- kv := "ttl:" + sInfo.TTL + ","
- checks_in_tag += kv
- checks = append(checks, check)
- }
-
- if len(sInfo.HTTP) > 0 {
- check := new(consulapi.AgentServiceCheck)
- check.HTTP = fmt.Sprintf("http://%s:%d%s", config.IPAddress, cport.ContainerPort, sInfo.HTTP)
- kv := "http:" + sInfo.HTTP + ","
- checks_in_tag += kv
- if len(sInfo.Interval) == 0 {
- check.Interval = DefaultInterval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + DefaultInterval + ","
- checks_in_tag += kv
- }
- } else {
- check.Interval = sInfo.Interval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + sInfo.Interval + ","
- checks_in_tag += kv
- }
- }
- if len(sInfo.Timeout) > 0 {
- check.Timeout = sInfo.Timeout
- if strings.Contains(checks_in_tag, "timeout:") {
- //do nothing
- } else {
- kv = "timeout:" + sInfo.Timeout + ","
- checks_in_tag += kv
- }
- }
- checks = append(checks, check)
- }
-
- if len(sInfo.TCP) > 0 {
- check := new(consulapi.AgentServiceCheck)
- check.TCP = fmt.Sprintf("%s:%d", config.IPAddress, cport.ContainerPort)
- kv := "tcp:ok,"
- checks_in_tag += kv
- if len(sInfo.Interval) == 0 {
- check.Interval = DefaultInterval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + DefaultInterval + ","
- checks_in_tag += kv
- }
- } else {
- check.Interval = sInfo.Interval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + sInfo.Interval + ","
- checks_in_tag += kv
- }
- }
- if len(sInfo.Timeout) > 0 {
- check.Timeout = sInfo.Timeout
- if strings.Contains(checks_in_tag, "timeout:") {
- //do nothing
- } else {
- kv = "timeout:" + sInfo.Timeout + ","
- checks_in_tag += kv
- }
- }
- checks = append(checks, check)
- }
-
- if len(sInfo.CMD) > 0 {
- check := new(consulapi.AgentServiceCheck)
- check.Script = fmt.Sprintf("check-cmd %s %s %s", cId, strconv.Itoa(int(cport.ContainerPort)), sInfo.CMD)
- kv := "script:" + sInfo.CMD + ","
- checks_in_tag += kv
- if len(sInfo.Interval) == 0 {
- check.Interval = DefaultInterval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + DefaultInterval + ","
- checks_in_tag += kv
- }
- } else {
- check.Interval = sInfo.Interval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + sInfo.Interval + ","
- checks_in_tag += kv
- }
- }
- checks = append(checks, check)
- }
-
- if len(sInfo.Script) > 0 {
- check := new(consulapi.AgentServiceCheck)
- withIp := strings.Replace(sInfo.Script, "$SERVICE_IP", config.IPAddress, -1)
- check.Script = strings.Replace(withIp, "$SERVICE_PORT", strconv.Itoa(int(cport.ContainerPort)), -1)
- kv := "script:" + sInfo.Script + ","
- checks_in_tag += kv
- if len(sInfo.Interval) == 0 {
- check.Interval = DefaultInterval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + DefaultInterval + ","
- checks_in_tag += kv
- }
- } else {
- check.Interval = sInfo.Interval
- if strings.Contains(checks_in_tag, "interval:") {
- //do nothing
- } else {
- kv = "interval:" + sInfo.Interval + ","
- checks_in_tag += kv
- }
- }
- if len(sInfo.Timeout) > 0 {
- check.Timeout = sInfo.Timeout
- if strings.Contains(checks_in_tag, "timeout:") {
- //do nothing
- } else {
- kv = "timeout:" + sInfo.Timeout + ","
- checks_in_tag += kv
- }
- }
- checks = append(checks, check)
- }
-
- //remove trailing ","
- if len(checks_in_tag) != 0 {
- checks_in_tag = strings.TrimRight(checks_in_tag, ",")
- }
-
- //build other talbes in tags
- var (
- base_in_tag string
- lb_in_tag string
- labels_in_tag = sInfo.Labels
- metadata_in_tag = sInfo.MData
- ns_in_tag = sInfo.NS
- )
-
- ts := strings.Split(sInfo.Tags, ",")
- for _, elem := range ts {
- if strings.Contains(elem, NETWORK_PLANE_TYPE) || strings.Contains(elem, VISUAL_RANGE) {
- kv := "," + elem
- labels_in_tag += kv
- continue
- }
-
- if strings.Contains(elem, LB_POLICY) || strings.Contains(elem, LB_SVR_PARAMS) {
- kv := "," + elem
- lb_in_tag += kv
- continue
- }
-
- kv := "," + elem
- base_in_tag += kv
- }
-
- //remove leading ","
- if len(base_in_tag) != 0 {
- base_in_tag = strings.TrimLeft(base_in_tag, ",")
- }
-
- if len(lb_in_tag) != 0 {
- lb_in_tag = strings.TrimLeft(lb_in_tag, ",")
- }
-
- if len(labels_in_tag) != 0 {
- labels_in_tag = strings.TrimLeft(labels_in_tag, ",")
- }
-
- //build tables in tag
- var tagslice []string
- if len(base_in_tag) != 0 {
- tagslice = append(tagslice, buildTableInJsonFormat(Table_BASE, base_in_tag))
- }
-
- if len(lb_in_tag) != 0 {
- tagslice = append(tagslice, buildTableInJsonFormat(Table_LB, lb_in_tag))
- }
-
- if len(labels_in_tag) == 0 {
- tagslice = append(tagslice, DefaultLabels)
- } else {
- if !strings.Contains(labels_in_tag, "visualRange") {
- labels_in_tag += ",visualRange:1"
- }
- tagslice = append(tagslice, buildTableInJsonFormat(Table_LABELS, labels_in_tag))
- }
-
- if len(metadata_in_tag) != 0 {
- tagslice = append(tagslice, buildTableInJsonFormat(Table_META_DATA, metadata_in_tag))
- }
-
- if len(ns_in_tag) != 0 {
- tagslice = append(tagslice, "\"ns\":{\"namespace\":\"" + ns_in_tag + "\"}")
- }
-
- if len(checks_in_tag) != 0 {
- tagslice = append(tagslice, buildTableInJsonFormat(Table_CHECKS, checks_in_tag))
- }
-
- //append "-<namespace>"
- name := sInfo.Name
- if len(ns_in_tag) != 0 {
- name = name + "-" + ns_in_tag
- }
-
- //handle IUI service
-// for _, elem := range ts {
-// var elemslice []string
-// elemslice = strings.Split(elem, ":")
-// if elemslice[0] == PROTOCOL {
-// switch elemslice[1] {
-// case PROTOCOL_UI:
-// name = PREFIX_UI + name
-// default:
-// log.Println("regular protocol:", elemslice[1])
-// }
-// break
-// }
-// }
-
- if len(sInfo.IPs) == 0 {
- serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name
- asr := &consulapi.AgentServiceRegistration{
- ID: serviceID,
- Name: name,
- Address: config.IPAddress,
- Port: int(cport.ContainerPort),
- Tags: tagslice,
- Checks: checks,
- }
- asrs = append(asrs, asr)
- return asrs
- } else {
- for _, ip := range sInfo.IPs {
- serviceID := config.BaseID + "-" + strconv.Itoa(int(cport.ContainerPort)) + "-" + name + "-" + ip.NetworkPlane
- addr := ip.IPAddress
- tagslice_with_single_network_plane_type := refillTagsliceWithSingleNetworkPlaneType(tagslice, labels_in_tag, ip.NetworkPlane)
- asr := &consulapi.AgentServiceRegistration{
- ID: serviceID,
- Name: name,
- Address: addr,
- Port: int(cport.ContainerPort),
- Tags: tagslice_with_single_network_plane_type,
- Checks: checks,
- }
- asrs = append(asrs, asr)
- }
- return asrs
- }
-}
-
-func addToServiceInfoMap(name, value, flag string, smap *map[string]*ServiceInfo) {
- segs := strings.Split(name, "_")
- switch flag {
- case Service_NAME:
- if strings.ContainsAny(value, ".") {
- log.Printf("Warning:service name %s contains dot", value)
- value = strings.Replace(value, ".", "-", -1)
- log.Printf("Warning:service name has been modified to %s", value)
- }
- if len(segs) == 3 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Name = value
- } else {
- sInfo := &ServiceInfo{
- Name: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 4 {
- id := segs[3]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Name = value
- } else {
- sInfo := &ServiceInfo{
- Name: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_TAGS:
- if len(segs) == 3 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Tags = value
- } else {
- sInfo := &ServiceInfo{
- Tags: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 4 {
- id := segs[3]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Tags = value
- } else {
- sInfo := &ServiceInfo{
- Tags: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_LABELS:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Labels = value
- } else {
- sInfo := &ServiceInfo{
- Labels: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Labels = value
- } else {
- sInfo := &ServiceInfo{
- Labels: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_MDATA:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].MData = value
- } else {
- sInfo := &ServiceInfo{
- MData: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].MData = value
- } else {
- sInfo := &ServiceInfo{
- MData: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_NS:
- if len(segs) == 3 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].NS = value
- } else {
- sInfo := &ServiceInfo{
- NS: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 4 {
- id := segs[3]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].NS = value
- } else {
- sInfo := &ServiceInfo{
- NS: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_TTL:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].TTL = value
- } else {
- sInfo := &ServiceInfo{
- TTL: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].TTL = value
- } else {
- sInfo := &ServiceInfo{
- TTL: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_HTTP:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].HTTP = value
- } else {
- sInfo := &ServiceInfo{
- HTTP: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].HTTP = value
- } else {
- sInfo := &ServiceInfo{
- HTTP: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_TCP:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].TCP = value
- } else {
- sInfo := &ServiceInfo{
- TCP: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].TCP = value
- } else {
- sInfo := &ServiceInfo{
- TCP: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_CMD:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].CMD = value
- } else {
- sInfo := &ServiceInfo{
- CMD: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].CMD = value
- } else {
- sInfo := &ServiceInfo{
- CMD: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_SCRIPT:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Script = value
- } else {
- sInfo := &ServiceInfo{
- Script: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Script = value
- } else {
- sInfo := &ServiceInfo{
- Script: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_TIMEOUT:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Timeout = value
- } else {
- sInfo := &ServiceInfo{
- Timeout: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Timeout = value
- } else {
- sInfo := &ServiceInfo{
- Timeout: value,
- }
- (*smap)[id] = sInfo
- }
- }
- case Service_INTERVAL:
- if len(segs) == 4 {
- if _, ok := (*smap)["0"]; ok {
- (*smap)["0"].Interval = value
- } else {
- sInfo := &ServiceInfo{
- Interval: value,
- }
- (*smap)["0"] = sInfo
- }
- } else if len(segs) == 5 {
- id := segs[4]
- if _, ok := (*smap)[id]; ok {
- (*smap)[id].Interval = value
- } else {
- sInfo := &ServiceInfo{
- Interval: value,
- }
- (*smap)[id] = sInfo
- }
- }
- default:
- log.Println("Unsupported Service Attribute: ", flag)
- }
-}
-
-func buildTableInJsonFormat(table, input string) string {
- head := "\"" + table + "\":{"
- tail := "}"
- body := ""
-
- s := strings.Split(input, ",")
- slen := len(s)
- for _, v := range s {
- slen--
- s1 := strings.Split(v, ":")
- mstr := "\"" + strings.TrimSpace(s1[0]) + "\":"
- if strings.Contains(body, mstr) {
- if slen == 0 {
- body = strings.TrimRight(body, ",")
- }
- continue
- }
- if len(s1) == 2 {
- kv := "\"" + strings.TrimSpace(s1[0]) + "\":\"" + strings.TrimSpace(s1[1]) + "\""
- if slen != 0 {
- kv += ","
- }
- body += kv
- }
- }
-
- return head + body + tail
-}
-
-func refillTagsliceWithSingleNetworkPlaneType(old []string, labels, nw_type string) []string {
- var ret_tagslice []string
- var new_labels string
-
- for _, elem := range old {
- if strings.Contains(elem, NETWORK_PLANE_TYPE) {
- continue
- }
- ret_tagslice = append(ret_tagslice, elem)
- }
-
- sl := strings.Split(labels, NETWORK_PLANE_TYPE)
- sl1 := strings.SplitN(sl[1], ",", 2)
- nw_type_field := NETWORK_PLANE_TYPE + ":" + nw_type
-
- if len(sl1) == 2 {
- new_labels = sl[0] + nw_type_field + "," + sl1[1]
- } else {
- new_labels = sl[0] + nw_type_field
- }
- ret_tagslice = append(ret_tagslice, buildTableInJsonFormat(Table_LABELS, new_labels))
-
- return ret_tagslice
-} \ No newline at end of file
diff --git a/kube2consul/src/kube2consul/types.go b/kube2consul/src/kube2consul/types.go
deleted file mode 100644
index 0105d98..0000000
--- a/kube2consul/src/kube2consul/types.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
-Copyright 2017 ZTE, Inc. and others.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-// types.go
-package main
-
-import (
- kapi "k8s.io/kubernetes/pkg/api"
-)
-
-type KubeWorkAction string
-
-const (
- KubeWorkAddNode KubeWorkAction = "AddNode"
- KubeWorkRemoveNode KubeWorkAction = "RemoveNode"
- KubeWorkUpdateNode KubeWorkAction = "UpdateNode"
- KubeWorkAddService KubeWorkAction = "AddService"
- KubeWorkRemoveService KubeWorkAction = "RemoveService"
- KubeWorkUpdateService KubeWorkAction = "UpdateService"
- KubeWorkAddPod KubeWorkAction = "AddPod"
- KubeWorkRemovePod KubeWorkAction = "RemovePod"
- KubeWorkUpdatePod KubeWorkAction = "UpdatePod"
- KubeWorkSync KubeWorkAction = "Sync"
-)
-
-type KubeWork struct {
- Action KubeWorkAction
- Node *kapi.Node
- Service *kapi.Service
- Pod *kapi.Pod
-}
-
-type ConsulWorkAction string
-
-const (
- ConsulWorkAddService ConsulWorkAction = "AddService"
- ConsulWorkRemoveService ConsulWorkAction = "RemoveService"
- ConsulWorkAddPod ConsulWorkAction = "AddPod"
- ConsulWorkRemovePod ConsulWorkAction = "RemovePod"
- ConsulWorkSyncDNS ConsulWorkAction = "SyncDNS"
-)
-
-type ConsulWork struct {
- Action ConsulWorkAction
- Service *kapi.Service
- Pod *kapi.Pod
- Config DnsInfo
-}
-
-type DnsInfo struct {
- BaseID string
- IPAddress string
- ServiceType kapi.ServiceType
-}
-
-type ServiceInfo struct {
- Name string
- Tags string
- Labels string
- MData string
- NS string
- TTL string
- HTTP string
- TCP string
- CMD string
- Script string
- Timeout string
- Interval string
- IPs []PodIP
-}
-
-type PdmPodIPInfo struct {
- Pod PodIPInfo `json:"pod"`
-}
-
-type PodIPInfo struct {
- Name string `json:"name"`
- IPs []PodIP `json:"ips"`
-}
-
-type PodIP struct {
- NetworkPlane string `json:"network_plane_name"`
- IPAddress string `json:"ip_address"`
-}
diff --git a/kube2consul/src/kube2consul/util/restclient/restclient.go b/kube2consul/src/kube2consul/util/restclient/restclient.go
deleted file mode 100644
index e01ae0f..0000000
--- a/kube2consul/src/kube2consul/util/restclient/restclient.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package restclient
-
-import (
- "fmt"
- "io/ioutil"
- "net/http"
-)
-
-type RESTClient struct {
- base string
- resource string
- param string
-}
-
-func NewRESTClient(baseURL string, versionedAPIPath string, urlParameter string) *RESTClient {
- return &RESTClient{
- base: baseURL,
- resource: versionedAPIPath,
- param: urlParameter,
- }
-}
-
-func (c *RESTClient) Get() (b []byte, err error) {
- url := c.base + "/" + c.resource + "/" + c.param
- res, err := http.Get(url)
- if err != nil {
- return nil, err
- }
-
- if res.StatusCode != 200 {
- return nil, fmt.Errorf(res.Status)
- }
-
- buf, err := ioutil.ReadAll(res.Body)
- res.Body.Close()
-
- if err != nil {
- return nil, err
- }
-
- return buf, nil
-}
diff --git a/kube2consul/src/main/blueprint/deploy.yml b/kube2consul/src/main/blueprint/deploy.yml
deleted file mode 100644
index cc61076..0000000
--- a/kube2consul/src/main/blueprint/deploy.yml
+++ /dev/null
@@ -1,8 +0,0 @@
----
-- remote_user: ubuntu
- become: yes
- become_method: sudo
- vars_files:
- - vars.yml
- tasks:
- - include: task.yml
diff --git a/kube2consul/src/main/blueprint/list_of_servicelet.list b/kube2consul/src/main/blueprint/list_of_servicelet.list
deleted file mode 100644
index 77a1d4f..0000000
--- a/kube2consul/src/main/blueprint/list_of_servicelet.list
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "servicelet_module":[
- ]
-} \ No newline at end of file
diff --git a/kube2consul/src/main/blueprint/task.yml b/kube2consul/src/main/blueprint/task.yml
deleted file mode 100644
index 45cfcd9..0000000
--- a/kube2consul/src/main/blueprint/task.yml
+++ /dev/null
@@ -1,118 +0,0 @@
----
-- name: remove kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- state: absent
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- net: host
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- when:
- - all_in_one == 'no'
- - master_in_controller == 'no'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- net: host
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- CLUSTER_TYPE: "openshift"
- when:
- - all_in_one == 'no'
- - master_in_controller == 'no'
- - cluster_type == 'openshift'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- when:
- - all_in_one == 'yes'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- CLUSTER_TYPE: "openshift"
- when:
- - all_in_one == 'yes'
- - cluster_type == 'openshift'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- when:
- - master_in_controller == 'yes'
- - cluster_type == 'k8s'
-
-- name: run kube2consul container
- docker:
- name: kube2consul
- image: "{{kube2consul_image}}"
- log_driver: syslog
- restart_policy: always
- privileged: true
- volumes:
- - "{{ kube2consul_data_host }}:{{ kube2consul_data_container }}"
- - "/root/.kube/config:/root/.kube/config:ro"
- env:
- KUBE_MASTER_IP: "{{kube_master_ip}}"
- PDM_CONTROLLER_IP: "{{pdm_controller_ip}}"
- JOIN_IP: "{{consul_join_ip}}"
- ALL_IN_ONE: "yes"
- CLUSTER_TYPE: "openshift"
- when:
- - master_in_controller == 'yes'
- - cluster_type == 'openshift' \ No newline at end of file
diff --git a/kube2consul/src/main/blueprint/vars.yml b/kube2consul/src/main/blueprint/vars.yml
deleted file mode 100644
index 971438d..0000000
--- a/kube2consul/src/main/blueprint/vars.yml
+++ /dev/null
@@ -1,14 +0,0 @@
----
-- api_network_ip:
-- man_network_ip:
-- registry_url:
-- cp_vertype:
-- cp_type:
-- cp_name:
-- cp_version:
-- kube2consul_image: "{{registry_url}}/{{cp_type}}/{{cp_name}}:{{cp_version}}"
-- kube_master_ip: "{{ hostvars[inventory_hostname]['api_network_ip'] }}"
-- pdm_controller_ip: "{{vp_ip}}"
-- consul_join_ip: "{{zenap_msb_consul_server_ip}}"
-- kube2consul_data_host: "/home/zenap-msb/consul_data/kube2consul_{{kube_master_ip}}"
-- kube2consul_data_container: "/consul-works/data-dir" \ No newline at end of file
diff --git a/kube2consul/src/main/docker/Dockerfile b/kube2consul/src/main/docker/Dockerfile
deleted file mode 100644
index 278cac5..0000000
--- a/kube2consul/src/main/docker/Dockerfile
+++ /dev/null
@@ -1,11 +0,0 @@
-FROM alpine:3.3
-ENV CONSUL_VERSION 0.7.1
-ENV BASE /
-ADD consul-linux_amd64.tar.gz /
-RUN cd /usr/lib \
- && ln -s /consul/libglib-2.0.so.0.4400.0 libglib-2.0.so.0 \
- && ln -s /consul/libintl.so.8.1.3 libintl.so.8
-COPY kube2consul /bin/
-COPY start.sh /
-
-ENTRYPOINT exec /start.sh \ No newline at end of file
diff --git a/kube2consul/src/main/docker/start.sh b/kube2consul/src/main/docker/start.sh
deleted file mode 100644
index 033c50b..0000000
--- a/kube2consul/src/main/docker/start.sh
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/bin/sh
-if [ -z "${KUBE_MASTER_IP}" ]; then
- echo "kube master node ip is required."
- exit 1
-fi
-
-if [ -n "${JOIN_IP}" ]; then
- echo "### Starting consul client"
- if [ -z "${ALL_IN_ONE}" ]; then
- /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind ${KUBE_MASTER_IP} -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
- else
- /consul/consul agent -data-dir /consul-works/data-dir -node kube2consul_${KUBE_MASTER_IP} -bind 0.0.0.0 -client 0.0.0.0 -retry-join ${JOIN_IP} -retry-interval 5s &
- fi
-fi
-
-if [ -z "${RUN_MODE}" ]; then
- echo "non-HA scenario."
-else
- echo "\n\n### Starting consul agent"
- cd ./consul
- ./entry.sh &
-fi
-
-kube_url="http://${KUBE_MASTER_IP}:8080"
-
-if [ "${CLUSTER_TYPE}" == "openshift" ]; then
- kube_url="https://${KUBE_MASTER_IP}:8443"
-fi
-
-echo "\n\n### Starting kube2consul"
-if [ -z "${PDM_CONTROLLER_IP}" ]; then
- /bin/kube2consul --kube_master_url ${kube_url}
-else
- echo "in Paas mode."
- /bin/kube2consul --kube_master_url ${kube_url} --pdm_controller_url http://${PDM_CONTROLLER_IP}:9527
-fi \ No newline at end of file