aboutsummaryrefslogtreecommitdiffstats
path: root/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache
diff options
context:
space:
mode:
Diffstat (limited to 'msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache')
-rw-r--r--msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java70
-rw-r--r--msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java230
-rw-r--r--msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java258
-rw-r--r--msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java70
-rw-r--r--msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java50
5 files changed, 678 insertions, 0 deletions
diff --git a/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java
new file mode 100644
index 0000000..8cafab4
--- /dev/null
+++ b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.openo.msb.wrapper.consul.cache;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.openo.msb.wrapper.consul.CatalogClient;
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
+import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
+
+import com.google.common.base.Function;
+
+public class CatalogCache extends ConsulCache<String, CatalogService>{
+
+ private final String serviceName;
+
+ private CatalogCache(Function<CatalogService, String> keyConversion,
+ ConsulCache.CallbackConsumer<CatalogService> callbackConsumer,String serviceName) {
+ super(keyConversion, callbackConsumer);
+ this.serviceName=serviceName;
+ // TODO Auto-generated constructor stub
+ }
+
+
+ public static CatalogCache newCache(
+ final CatalogClient catalogClient,
+ final String serviceName,
+ final int watchSeconds){
+ Function<CatalogService,String> keyExtractor = new Function<CatalogService, String>() {
+ @Override
+ public String apply(CatalogService input) {
+ //return input.getKey().substring(rootPath.length() + 1);
+ return input.getServiceId();
+ }
+ };
+
+ final CallbackConsumer<CatalogService> callbackConsumer = new CallbackConsumer<CatalogService>() {
+ @Override
+ public void consume(BigInteger index, ConsulResponseCallback<List<CatalogService>> callback) {
+ catalogClient.getService(serviceName, watchParams(index, watchSeconds),callback);
+ }
+ };
+
+
+ return new CatalogCache(keyExtractor, callbackConsumer,serviceName);
+
+
+ }
+
+ public String getServiceName(){
+ return this.serviceName;
+ }
+
+
+}
diff --git a/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java
new file mode 100644
index 0000000..e7494fa
--- /dev/null
+++ b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java
@@ -0,0 +1,230 @@
+/**
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.openo.msb.wrapper.consul.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
+import org.openo.msb.wrapper.consul.model.ConsulResponse;
+import org.openo.msb.wrapper.consul.option.QueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A cache structure that can provide an up-to-date read-only
+ * map backed by consul data
+ *
+ * @param <V>
+ */
+public class ConsulCache<K, V> {
+
+ enum State {latent, starting, started, stopped }
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class);
+
+ private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
+ private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of());
+ private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
+
+ private final Function<V, K> keyConversion;
+ private final CallbackConsumer<V> callBackConsumer;
+ private final ConsulResponseCallback<List<V>> responseCallback;
+
+ ConsulCache(
+ Function<V, K> keyConversion,
+ CallbackConsumer<V> callbackConsumer) {
+ this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS);
+ }
+
+ ConsulCache(
+ Function<V, K> keyConversion,
+ CallbackConsumer<V> callbackConsumer,
+ final long backoffDelayQty,
+ final TimeUnit backoffDelayUnit) {
+
+ this.keyConversion = keyConversion;
+ this.callBackConsumer = callbackConsumer;
+
+ this.responseCallback = new ConsulResponseCallback<List<V>>() {
+ @Override
+ public void onComplete(ConsulResponse<List<V>> consulResponse) {
+
+ if (!isRunning()) {
+ return;
+ }
+ updateIndex(consulResponse);
+ ImmutableMap<K, V> full = convertToMap(consulResponse);
+
+ boolean changed = !full.equals(lastResponse.get());
+// LOGGER.info("node changed:"+changed+"----"+full);
+ if (changed) {
+ // changes
+ lastResponse.set(full);
+ }
+
+ if (changed) {
+ for (Listener<K, V> l : listeners) {
+ l.notify(full);
+ }
+ }
+
+ if (state.compareAndSet(State.starting, State.started)) {
+ initLatch.countDown();
+ }
+ runCallback();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+
+ if (!isRunning()) {
+ return;
+ }
+ LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
+
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ runCallback();
+ }
+ }, backoffDelayQty, backoffDelayUnit);
+ }
+ };
+ }
+
+ public void start() throws Exception {
+ checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
+ runCallback();
+ }
+
+ public void stop() throws Exception {
+ State previous = state.getAndSet(State.stopped);
+ if (previous != State.stopped) {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void runCallback() {
+ if (isRunning()) {
+ callBackConsumer.consume(latestIndex.get(), responseCallback);
+ }
+ }
+
+ private boolean isRunning() {
+ return state.get() == State.started || state.get() == State.starting;
+ }
+
+ public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
+ return initLatch.await(timeout, unit);
+ }
+
+ public ImmutableMap<K, V> getMap() {
+ return lastResponse.get();
+ }
+
+ @VisibleForTesting
+ ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) {
+ if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder();
+ final Set<K> keySet = new HashSet<>();
+ for (final V v : response.getResponse()) {
+ final K key = keyConversion.apply(v);
+ if (key != null) {
+ if (!keySet.contains(key)) {
+ builder.put(key, v);
+ } else {
+ System.out.println(key.toString());
+ LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
+ }
+ }
+ keySet.add(key);
+ }
+ return builder.build();
+ }
+
+ private void updateIndex(ConsulResponse<List<V>> consulResponse) {
+ if (consulResponse != null && consulResponse.getIndex() != null) {
+ this.latestIndex.set(consulResponse.getIndex());
+ }
+ }
+
+ protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
+ if (index == null) {
+ return QueryOptions.BLANK;
+ } else {
+ return QueryOptions.blockSeconds(blockSeconds, index).build();
+ }
+ }
+
+ /**
+ * passed in by creators to vary the content of the cached values
+ *
+ * @param <V>
+ */
+ protected interface CallbackConsumer<V> {
+ void consume(BigInteger index, ConsulResponseCallback<List<V>> callback);
+ }
+
+ /**
+ * Implementers can register a listener to receive
+ * a new map when it changes
+ *
+ * @param <V>
+ */
+ public interface Listener<K, V> {
+ void notify(Map<K, V> newValues);
+ }
+
+ public boolean addListener(Listener<K, V> listener) {
+ boolean added = listeners.add(listener);
+ if (state.get() == State.started) {
+ listener.notify(lastResponse.get());
+ }
+ return added;
+ }
+
+ public boolean removeListener(Listener<K, V> listener) {
+ return listeners.remove(listener);
+ }
+
+ @VisibleForTesting
+ protected State getState() {
+ return state.get();
+ }
+}
diff --git a/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java
new file mode 100644
index 0000000..84fd8ec
--- /dev/null
+++ b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java
@@ -0,0 +1,258 @@
+/**
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.openo.msb.wrapper.consul.cache;
+
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
+import org.openo.msb.wrapper.consul.model.ConsulResponse;
+import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
+import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
+import org.openo.msb.wrapper.consul.option.QueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A cache structure that can provide an up-to-date read-only
+ * map backed by consul data
+ *
+ * @param <V>
+ */
+public class ConsulCache4Map<K, V> {
+
+ enum State {latent, starting, started, stopped }
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);
+
+ private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
+ private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());
+ private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
+
+ private final CallbackConsumer<V> callBackConsumer;
+ private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;
+
+ ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {
+ this( callbackConsumer, 10, TimeUnit.SECONDS);
+ }
+
+ ConsulCache4Map(
+ CallbackConsumer<V> callbackConsumer,
+ final long backoffDelayQty,
+ final TimeUnit backoffDelayUnit) {
+
+ this.callBackConsumer = callbackConsumer;
+
+ this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {
+ @Override
+ public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {
+
+ if (!isRunning()) {
+ return;
+ }
+ updateIndex(consulResponse);
+ ImmutableList<ServiceInfo> full = convertToList(consulResponse);
+ List<ServiceInfo> oldList=lastResponse.get();
+ boolean changed = !full.equals(lastResponse.get());
+// LOGGER.info("service changed:"+changed+"----"+full);
+ if (changed) {
+ // changes
+ lastResponse.set(full);
+ }
+
+ if (changed) {
+ for (Listener<K, V> l : listeners) {
+ l.notify(oldList,full);
+ }
+ }
+
+ if (state.compareAndSet(State.starting, State.started)) {
+ initLatch.countDown();
+ }
+ runCallback();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+
+ if (!isRunning()) {
+ return;
+ }
+ LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
+
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ runCallback();
+ }
+ }, backoffDelayQty, backoffDelayUnit);
+ }
+ };
+ }
+
+ public void start() throws Exception {
+ checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
+ runCallback();
+ }
+
+ public void stop() throws Exception {
+ State previous = state.getAndSet(State.stopped);
+ if (previous != State.stopped) {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void runCallback() {
+ if (isRunning()) {
+ callBackConsumer.consume(latestIndex.get(), responseCallback);
+ }
+ }
+
+ private boolean isRunning() {
+ return state.get() == State.started || state.get() == State.starting;
+ }
+
+ public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
+ return initLatch.await(timeout, unit);
+ }
+
+ public ImmutableList<ServiceInfo> getMap() {
+ return lastResponse.get();
+ }
+
+ @VisibleForTesting
+ ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {
+ if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
+ return ImmutableList.of();
+ }
+
+ final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();
+ final Set<String> keySet = new HashSet<>();
+
+ for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {
+
+ String key = entry.getKey();
+
+ if (key != null && !"consul".equals(key)) {
+ if (!keySet.contains(key)) {
+ ServiceInfo serviceInfo=new ServiceInfo();
+ serviceInfo.setServiceName(key);
+
+ List<String> value=entry.getValue();
+ for(String tag:value){
+
+ if(tag.startsWith("version")){
+ String version;
+ if(tag.split(":").length==2)
+ {
+ version = tag.split(":")[1];
+ }
+ else{
+ version="";
+ }
+
+ serviceInfo.setVersion(version);
+ break;
+ }
+ }
+
+ builder.add(serviceInfo);
+ } else {
+ System.out.println(key.toString());
+ LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
+ }
+ }
+ keySet.add(key);
+
+ }
+
+
+ return builder.build();
+ }
+
+ private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {
+ if (consulResponse != null && consulResponse.getIndex() != null) {
+ this.latestIndex.set(consulResponse.getIndex());
+ }
+ }
+
+ protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
+ if (index == null) {
+ return QueryOptions.BLANK;
+ } else {
+ return QueryOptions.blockSeconds(blockSeconds, index).build();
+ }
+ }
+
+ /**
+ * passed in by creators to vary the content of the cached values
+ *
+ * @param <V>
+ */
+ protected interface CallbackConsumer<V> {
+ void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);
+ }
+
+ /**
+ * Implementers can register a listener to receive
+ * a new map when it changes
+ *
+ * @param <V>
+ */
+ public interface Listener<K, V> {
+ void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);
+ }
+
+ public boolean addListener(Listener<K, V> listener) {
+ boolean added = listeners.add(listener);
+ if (state.get() == State.started) {
+ listener.notify(lastResponse.get(),lastResponse.get());
+ }
+ return added;
+ }
+
+ public boolean removeListener(Listener<K, V> listener) {
+ return listeners.remove(listener);
+ }
+
+ @VisibleForTesting
+ protected State getState() {
+ return state.get();
+ }
+
+
+
+}
diff --git a/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java
new file mode 100644
index 0000000..0134eeb
--- /dev/null
+++ b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.openo.msb.wrapper.consul.cache;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.openo.msb.wrapper.consul.HealthClient;
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
+import org.openo.msb.wrapper.consul.model.health.ServiceHealth;
+
+import com.google.common.base.Function;
+
+public class HealthCache extends ConsulCache<String, ServiceHealth>{
+
+ private final String serviceName;
+
+ private HealthCache(Function<ServiceHealth, String> keyConversion,
+ ConsulCache.CallbackConsumer<ServiceHealth> callbackConsumer,String serviceName) {
+ super(keyConversion, callbackConsumer);
+ this.serviceName=serviceName;
+ // TODO Auto-generated constructor stub
+ }
+
+
+ public static HealthCache newCache(
+ final HealthClient healthClient,
+ final String serviceName,
+ final int watchSeconds){
+ Function<ServiceHealth,String> keyExtractor = new Function<ServiceHealth, String>() {
+ @Override
+ public String apply(ServiceHealth input) {
+ //return input.getKey().substring(rootPath.length() + 1);
+ return input.getService().getId();
+ }
+ };
+
+ final CallbackConsumer<ServiceHealth> callbackConsumer = new CallbackConsumer<ServiceHealth>() {
+ @Override
+ public void consume(BigInteger index, ConsulResponseCallback<List<ServiceHealth>> callback) {
+ healthClient.getHealthyServiceInstances(serviceName, watchParams(index, watchSeconds),callback);
+ }
+ };
+
+
+ return new HealthCache(keyExtractor, callbackConsumer,serviceName);
+
+
+ }
+
+ public String getServiceName(){
+ return this.serviceName;
+ }
+
+
+} \ No newline at end of file
diff --git a/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java
new file mode 100644
index 0000000..80cdb7b
--- /dev/null
+++ b/msb-core/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.openo.msb.wrapper.consul.cache;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+
+import org.openo.msb.wrapper.consul.CatalogClient;
+import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
+
+public class ServiceCache extends ConsulCache4Map<String, Map<String, List<String>>> {
+ private ServiceCache( ConsulCache4Map.CallbackConsumer<Map<String, List<String>>> callbackConsumer) {
+ super(callbackConsumer);
+ // TODO Auto-generated constructor stub
+ }
+
+
+ public static ServiceCache newCache(
+ final CatalogClient catalogClient,
+ final int watchSeconds){
+
+
+ final CallbackConsumer<Map<String, List<String>>> callbackConsumer = new CallbackConsumer<Map<String, List<String>>>() {
+ @Override
+ public void consume(BigInteger index, ConsulResponseCallback<Map<String, List<String>>> callback) {
+ catalogClient.getService(watchParams(index, watchSeconds),callback);
+ }
+ };
+
+
+ return new ServiceCache(callbackConsumer);
+
+
+ }
+}