summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLvbo163 <lv.bo163@zte.com.cn>2018-08-02 17:04:51 +0800
committerLvbo163 <lv.bo163@zte.com.cn>2018-08-02 17:04:51 +0800
commit2b41e0ff9fb0466e5a209fe12ffe897bfe32f005 (patch)
tree5d7cd2ef1f1ad1ee534af55f9c4fe3c0c37e277a
parentced1de22c3fc25c6150042136ef27eaf4b297451 (diff)
monitor consul data change
Issue-ID: MSB-248 Change-Id: I495adcc4556c017ad42655ffcead957af9a9c106 Signed-off-by: Lvbo163 <lv.bo163@zte.com.cn>
-rw-r--r--msb2pilot/src/msb2pilot/consul/monitor.go78
-rw-r--r--msb2pilot/src/msb2pilot/main.go17
2 files changed, 94 insertions, 1 deletions
diff --git a/msb2pilot/src/msb2pilot/consul/monitor.go b/msb2pilot/src/msb2pilot/consul/monitor.go
new file mode 100644
index 0000000..c3adde4
--- /dev/null
+++ b/msb2pilot/src/msb2pilot/consul/monitor.go
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2018 ZTE Corporation.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and the Apache License 2.0 which both accompany this distribution,
+ * and are available at http://www.eclipse.org/legal/epl-v10.html
+ * and http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Contributors:
+ * ZTE - initial Project
+ */
+package consul
+
+import (
+ "msb2pilot/log"
+ "msb2pilot/models"
+ "time"
+
+ "github.com/hashicorp/consul/api"
+)
+
+type Monitor interface {
+ Start(<-chan struct{})
+}
+
+type ServiceHandler func(newServices []*models.MsbService)
+
+type consulMonitor struct {
+ discovery *api.Client
+ period time.Duration
+ serviceHandler ServiceHandler
+}
+
+func NewConsulMonitor(client *api.Client, period time.Duration, serviceHandler ServiceHandler) Monitor {
+ return &consulMonitor{
+ discovery: client,
+ period: period,
+ serviceHandler: serviceHandler,
+ }
+}
+
+func (this *consulMonitor) Start(stop <-chan struct{}) {
+ this.run(stop)
+}
+
+func (this *consulMonitor) run(stop <-chan struct{}) {
+ ticker := time.NewTicker(this.period)
+ for {
+ select {
+ case <-stop:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ this.updateServiceRecord()
+ }
+ }
+
+}
+
+func (this *consulMonitor) updateServiceRecord() {
+ data, err := GetServices()
+ if err != nil {
+ log.Log.Error("failed to get services from consul", err)
+ return
+ }
+
+ newRecords := make([]*models.MsbService, 0, len(data))
+ for name := range data {
+ endpoints, err := GetInstances(name)
+ if err != nil {
+ log.Log.Error("failed to get service instance of "+name, err)
+ continue
+ }
+ newRecords = append(newRecords, models.ConvertService(endpoints))
+ }
+
+ this.serviceHandler(newRecords)
+}
diff --git a/msb2pilot/src/msb2pilot/main.go b/msb2pilot/src/msb2pilot/main.go
index 236e369..7ed762c 100644
--- a/msb2pilot/src/msb2pilot/main.go
+++ b/msb2pilot/src/msb2pilot/main.go
@@ -12,15 +12,30 @@
package main
import (
- _ "msb2pilot/consul"
+ "fmt"
+ "msb2pilot/consul"
"msb2pilot/log"
+ "msb2pilot/models"
_ "msb2pilot/routers"
+ "time"
"github.com/astaxie/beego"
)
func main() {
log.Log.Informational("**************** init msb2pilot ************************")
+ // start sync msb data
+ go syncConsulData()
beego.Run()
}
+
+func syncConsulData() {
+ stop := make(chan struct{})
+ monitor := consul.NewConsulMonitor(nil, 20*time.Second, syncMsbData)
+ monitor.Start(stop)
+}
+
+func syncMsbData(newServices []*models.MsbService) {
+ fmt.Println(len(newServices), "services updated", time.Now())
+}