diff options
-rw-r--r-- | msb2pilot/src/msb2pilot/consul/monitor.go | 78 | ||||
-rw-r--r-- | msb2pilot/src/msb2pilot/main.go | 17 |
2 files changed, 94 insertions, 1 deletions
diff --git a/msb2pilot/src/msb2pilot/consul/monitor.go b/msb2pilot/src/msb2pilot/consul/monitor.go new file mode 100644 index 0000000..c3adde4 --- /dev/null +++ b/msb2pilot/src/msb2pilot/consul/monitor.go @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2018 ZTE Corporation. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and the Apache License 2.0 which both accompany this distribution, + * and are available at http://www.eclipse.org/legal/epl-v10.html + * and http://www.apache.org/licenses/LICENSE-2.0 + * + * Contributors: + * ZTE - initial Project + */ +package consul + +import ( + "msb2pilot/log" + "msb2pilot/models" + "time" + + "github.com/hashicorp/consul/api" +) + +type Monitor interface { + Start(<-chan struct{}) +} + +type ServiceHandler func(newServices []*models.MsbService) + +type consulMonitor struct { + discovery *api.Client + period time.Duration + serviceHandler ServiceHandler +} + +func NewConsulMonitor(client *api.Client, period time.Duration, serviceHandler ServiceHandler) Monitor { + return &consulMonitor{ + discovery: client, + period: period, + serviceHandler: serviceHandler, + } +} + +func (this *consulMonitor) Start(stop <-chan struct{}) { + this.run(stop) +} + +func (this *consulMonitor) run(stop <-chan struct{}) { + ticker := time.NewTicker(this.period) + for { + select { + case <-stop: + ticker.Stop() + return + case <-ticker.C: + this.updateServiceRecord() + } + } + +} + +func (this *consulMonitor) updateServiceRecord() { + data, err := GetServices() + if err != nil { + log.Log.Error("failed to get services from consul", err) + return + } + + newRecords := make([]*models.MsbService, 0, len(data)) + for name := range data { + endpoints, err := GetInstances(name) + if err != nil { + log.Log.Error("failed to get service instance of "+name, err) + continue + } + newRecords = append(newRecords, models.ConvertService(endpoints)) + } + + this.serviceHandler(newRecords) +} diff --git a/msb2pilot/src/msb2pilot/main.go b/msb2pilot/src/msb2pilot/main.go index 236e369..7ed762c 100644 --- a/msb2pilot/src/msb2pilot/main.go +++ b/msb2pilot/src/msb2pilot/main.go @@ -12,15 +12,30 @@ package main import ( - _ "msb2pilot/consul" + "fmt" + "msb2pilot/consul" "msb2pilot/log" + "msb2pilot/models" _ "msb2pilot/routers" + "time" "github.com/astaxie/beego" ) func main() { log.Log.Informational("**************** init msb2pilot ************************") + // start sync msb data + go syncConsulData() beego.Run() } + +func syncConsulData() { + stop := make(chan struct{}) + monitor := consul.NewConsulMonitor(nil, 20*time.Second, syncMsbData) + monitor.Start(stop) +} + +func syncMsbData(newServices []*models.MsbService) { + fmt.Println(len(newServices), "services updated", time.Now()) +} |