summaryrefslogtreecommitdiffstats
path: root/msb2pilot/src/msb2pilot/consul/monitor.go
diff options
context:
space:
mode:
Diffstat (limited to 'msb2pilot/src/msb2pilot/consul/monitor.go')
-rw-r--r--msb2pilot/src/msb2pilot/consul/monitor.go78
1 files changed, 78 insertions, 0 deletions
diff --git a/msb2pilot/src/msb2pilot/consul/monitor.go b/msb2pilot/src/msb2pilot/consul/monitor.go
new file mode 100644
index 0000000..c3adde4
--- /dev/null
+++ b/msb2pilot/src/msb2pilot/consul/monitor.go
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2018 ZTE Corporation.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and the Apache License 2.0 which both accompany this distribution,
+ * and are available at http://www.eclipse.org/legal/epl-v10.html
+ * and http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Contributors:
+ * ZTE - initial Project
+ */
+package consul
+
+import (
+ "msb2pilot/log"
+ "msb2pilot/models"
+ "time"
+
+ "github.com/hashicorp/consul/api"
+)
+
+type Monitor interface {
+ Start(<-chan struct{})
+}
+
+type ServiceHandler func(newServices []*models.MsbService)
+
+type consulMonitor struct {
+ discovery *api.Client
+ period time.Duration
+ serviceHandler ServiceHandler
+}
+
+func NewConsulMonitor(client *api.Client, period time.Duration, serviceHandler ServiceHandler) Monitor {
+ return &consulMonitor{
+ discovery: client,
+ period: period,
+ serviceHandler: serviceHandler,
+ }
+}
+
+func (this *consulMonitor) Start(stop <-chan struct{}) {
+ this.run(stop)
+}
+
+func (this *consulMonitor) run(stop <-chan struct{}) {
+ ticker := time.NewTicker(this.period)
+ for {
+ select {
+ case <-stop:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ this.updateServiceRecord()
+ }
+ }
+
+}
+
+func (this *consulMonitor) updateServiceRecord() {
+ data, err := GetServices()
+ if err != nil {
+ log.Log.Error("failed to get services from consul", err)
+ return
+ }
+
+ newRecords := make([]*models.MsbService, 0, len(data))
+ for name := range data {
+ endpoints, err := GetInstances(name)
+ if err != nil {
+ log.Log.Error("failed to get service instance of "+name, err)
+ continue
+ }
+ newRecords = append(newRecords, models.ConvertService(endpoints))
+ }
+
+ this.serviceHandler(newRecords)
+}