diff options
author | 2016-08-17 12:37:07 +0800 | |
---|---|---|
committer | 2016-08-17 14:03:37 +0800 | |
commit | 4143173cd08ef515173e5ad4b4c15d4e9f9f1943 (patch) | |
tree | 0aa7a7ae64ce9876b95628cb5835ff5e12c8a546 /apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache | |
parent | 230628860edf5ff489dd59e299d35c80d1329a5d (diff) |
1. Adjust the directory hierarchy
2. Fix the compile issue
Change-Id: Ibf10c83104e5e673bc797013799861426cd950ce
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache')
5 files changed, 0 insertions, 678 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java deleted file mode 100644 index 8cafab4..0000000 --- a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/CatalogCache.java +++ /dev/null @@ -1,70 +0,0 @@ -/**
-* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.openo.msb.wrapper.consul.cache;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import org.openo.msb.wrapper.consul.CatalogClient;
-import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
-import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
-
-import com.google.common.base.Function;
-
-public class CatalogCache extends ConsulCache<String, CatalogService>{
-
- private final String serviceName;
-
- private CatalogCache(Function<CatalogService, String> keyConversion,
- ConsulCache.CallbackConsumer<CatalogService> callbackConsumer,String serviceName) {
- super(keyConversion, callbackConsumer);
- this.serviceName=serviceName;
- // TODO Auto-generated constructor stub
- }
-
-
- public static CatalogCache newCache(
- final CatalogClient catalogClient,
- final String serviceName,
- final int watchSeconds){
- Function<CatalogService,String> keyExtractor = new Function<CatalogService, String>() {
- @Override
- public String apply(CatalogService input) {
- //return input.getKey().substring(rootPath.length() + 1);
- return input.getServiceId();
- }
- };
-
- final CallbackConsumer<CatalogService> callbackConsumer = new CallbackConsumer<CatalogService>() {
- @Override
- public void consume(BigInteger index, ConsulResponseCallback<List<CatalogService>> callback) {
- catalogClient.getService(serviceName, watchParams(index, watchSeconds),callback);
- }
- };
-
-
- return new CatalogCache(keyExtractor, callbackConsumer,serviceName);
-
-
- }
-
- public String getServiceName(){
- return this.serviceName;
- }
-
-
-}
diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java deleted file mode 100644 index e7494fa..0000000 --- a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache.java +++ /dev/null @@ -1,230 +0,0 @@ -/** -* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE) -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.openo.msb.wrapper.consul.cache; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; - -import org.openo.msb.wrapper.consul.async.ConsulResponseCallback; -import org.openo.msb.wrapper.consul.model.ConsulResponse; -import org.openo.msb.wrapper.consul.option.QueryOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigInteger; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Preconditions.checkState; - -/** - * A cache structure that can provide an up-to-date read-only - * map backed by consul data - * - * @param <V> - */ -public class ConsulCache<K, V> { - - enum State {latent, starting, started, stopped } - - private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class); - - private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null); - private final AtomicReference<ImmutableMap<K, V>> lastResponse = new AtomicReference<ImmutableMap<K, V>>(ImmutableMap.<K, V>of()); - private final AtomicReference<State> state = new AtomicReference<State>(State.latent); - private final CountDownLatch initLatch = new CountDownLatch(1); - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>(); - - private final Function<V, K> keyConversion; - private final CallbackConsumer<V> callBackConsumer; - private final ConsulResponseCallback<List<V>> responseCallback; - - ConsulCache( - Function<V, K> keyConversion, - CallbackConsumer<V> callbackConsumer) { - this(keyConversion, callbackConsumer, 10, TimeUnit.SECONDS); - } - - ConsulCache( - Function<V, K> keyConversion, - CallbackConsumer<V> callbackConsumer, - final long backoffDelayQty, - final TimeUnit backoffDelayUnit) { - - this.keyConversion = keyConversion; - this.callBackConsumer = callbackConsumer; - - this.responseCallback = new ConsulResponseCallback<List<V>>() { - @Override - public void onComplete(ConsulResponse<List<V>> consulResponse) { - - if (!isRunning()) { - return; - } - updateIndex(consulResponse); - ImmutableMap<K, V> full = convertToMap(consulResponse); - - boolean changed = !full.equals(lastResponse.get()); -// LOGGER.info("node changed:"+changed+"----"+full); - if (changed) { - // changes - lastResponse.set(full); - } - - if (changed) { - for (Listener<K, V> l : listeners) { - l.notify(full); - } - } - - if (state.compareAndSet(State.starting, State.started)) { - initLatch.countDown(); - } - runCallback(); - } - - @Override - public void onFailure(Throwable throwable) { - - if (!isRunning()) { - return; - } - LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable); - - executorService.schedule(new Runnable() { - @Override - public void run() { - runCallback(); - } - }, backoffDelayQty, backoffDelayUnit); - } - }; - } - - public void start() throws Exception { - checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting); - runCallback(); - } - - public void stop() throws Exception { - State previous = state.getAndSet(State.stopped); - if (previous != State.stopped) { - executorService.shutdownNow(); - } - } - - private void runCallback() { - if (isRunning()) { - callBackConsumer.consume(latestIndex.get(), responseCallback); - } - } - - private boolean isRunning() { - return state.get() == State.started || state.get() == State.starting; - } - - public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { - return initLatch.await(timeout, unit); - } - - public ImmutableMap<K, V> getMap() { - return lastResponse.get(); - } - - @VisibleForTesting - ImmutableMap<K, V> convertToMap(final ConsulResponse<List<V>> response) { - if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) { - return ImmutableMap.of(); - } - - final ImmutableMap.Builder<K, V> builder = ImmutableMap.builder(); - final Set<K> keySet = new HashSet<>(); - for (final V v : response.getResponse()) { - final K key = keyConversion.apply(v); - if (key != null) { - if (!keySet.contains(key)) { - builder.put(key, v); - } else { - System.out.println(key.toString()); - LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString()); - } - } - keySet.add(key); - } - return builder.build(); - } - - private void updateIndex(ConsulResponse<List<V>> consulResponse) { - if (consulResponse != null && consulResponse.getIndex() != null) { - this.latestIndex.set(consulResponse.getIndex()); - } - } - - protected static QueryOptions watchParams(BigInteger index, int blockSeconds) { - if (index == null) { - return QueryOptions.BLANK; - } else { - return QueryOptions.blockSeconds(blockSeconds, index).build(); - } - } - - /** - * passed in by creators to vary the content of the cached values - * - * @param <V> - */ - protected interface CallbackConsumer<V> { - void consume(BigInteger index, ConsulResponseCallback<List<V>> callback); - } - - /** - * Implementers can register a listener to receive - * a new map when it changes - * - * @param <V> - */ - public interface Listener<K, V> { - void notify(Map<K, V> newValues); - } - - public boolean addListener(Listener<K, V> listener) { - boolean added = listeners.add(listener); - if (state.get() == State.started) { - listener.notify(lastResponse.get()); - } - return added; - } - - public boolean removeListener(Listener<K, V> listener) { - return listeners.remove(listener); - } - - @VisibleForTesting - protected State getState() { - return state.get(); - } -} diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java deleted file mode 100644 index 84fd8ec..0000000 --- a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ConsulCache4Map.java +++ /dev/null @@ -1,258 +0,0 @@ -/**
-* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.openo.msb.wrapper.consul.cache;
-
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.math.BigInteger;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
-import org.openo.msb.wrapper.consul.model.ConsulResponse;
-import org.openo.msb.wrapper.consul.model.catalog.CatalogService;
-import org.openo.msb.wrapper.consul.model.catalog.ServiceInfo;
-import org.openo.msb.wrapper.consul.option.QueryOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-
-/**
- * A cache structure that can provide an up-to-date read-only
- * map backed by consul data
- *
- * @param <V>
- */
-public class ConsulCache4Map<K, V> {
-
- enum State {latent, starting, started, stopped }
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache4Map.class);
-
- private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null);
- private final AtomicReference<ImmutableList<ServiceInfo>> lastResponse = new AtomicReference<ImmutableList<ServiceInfo>>(ImmutableList.<ServiceInfo>of());
- private final AtomicReference<State> state = new AtomicReference<State>(State.latent);
- private final CountDownLatch initLatch = new CountDownLatch(1);
- private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- private final CopyOnWriteArrayList<Listener<K, V>> listeners = new CopyOnWriteArrayList<Listener<K, V>>();
-
- private final CallbackConsumer<V> callBackConsumer;
- private final ConsulResponseCallback<Map<String,List<String>>> responseCallback;
-
- ConsulCache4Map(CallbackConsumer<V> callbackConsumer) {
- this( callbackConsumer, 10, TimeUnit.SECONDS);
- }
-
- ConsulCache4Map(
- CallbackConsumer<V> callbackConsumer,
- final long backoffDelayQty,
- final TimeUnit backoffDelayUnit) {
-
- this.callBackConsumer = callbackConsumer;
-
- this.responseCallback = new ConsulResponseCallback<Map<String,List<String>>>() {
- @Override
- public void onComplete(ConsulResponse<Map<String,List<String>>> consulResponse) {
-
- if (!isRunning()) {
- return;
- }
- updateIndex(consulResponse);
- ImmutableList<ServiceInfo> full = convertToList(consulResponse);
- List<ServiceInfo> oldList=lastResponse.get();
- boolean changed = !full.equals(lastResponse.get());
-// LOGGER.info("service changed:"+changed+"----"+full);
- if (changed) {
- // changes
- lastResponse.set(full);
- }
-
- if (changed) {
- for (Listener<K, V> l : listeners) {
- l.notify(oldList,full);
- }
- }
-
- if (state.compareAndSet(State.starting, State.started)) {
- initLatch.countDown();
- }
- runCallback();
- }
-
- @Override
- public void onFailure(Throwable throwable) {
-
- if (!isRunning()) {
- return;
- }
- LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", backoffDelayQty, backoffDelayUnit), throwable);
-
- executorService.schedule(new Runnable() {
- @Override
- public void run() {
- runCallback();
- }
- }, backoffDelayQty, backoffDelayUnit);
- }
- };
- }
-
- public void start() throws Exception {
- checkState(state.compareAndSet(State.latent, State.starting),"Cannot transition from state %s to %s", state.get(), State.starting);
- runCallback();
- }
-
- public void stop() throws Exception {
- State previous = state.getAndSet(State.stopped);
- if (previous != State.stopped) {
- executorService.shutdownNow();
- }
- }
-
- private void runCallback() {
- if (isRunning()) {
- callBackConsumer.consume(latestIndex.get(), responseCallback);
- }
- }
-
- private boolean isRunning() {
- return state.get() == State.started || state.get() == State.starting;
- }
-
- public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
- return initLatch.await(timeout, unit);
- }
-
- public ImmutableList<ServiceInfo> getMap() {
- return lastResponse.get();
- }
-
- @VisibleForTesting
- ImmutableList<ServiceInfo> convertToList(final ConsulResponse<Map<String,List<String>>> response) {
- if (response == null || response.getResponse() == null || response.getResponse().isEmpty()) {
- return ImmutableList.of();
- }
-
- final ImmutableList.Builder<ServiceInfo> builder = ImmutableList.builder();
- final Set<String> keySet = new HashSet<>();
-
- for(Map.Entry<String,List<String>> entry : response.getResponse().entrySet()) {
-
- String key = entry.getKey();
-
- if (key != null && !"consul".equals(key)) {
- if (!keySet.contains(key)) {
- ServiceInfo serviceInfo=new ServiceInfo();
- serviceInfo.setServiceName(key);
-
- List<String> value=entry.getValue();
- for(String tag:value){
-
- if(tag.startsWith("version")){
- String version;
- if(tag.split(":").length==2)
- {
- version = tag.split(":")[1];
- }
- else{
- version="";
- }
-
- serviceInfo.setVersion(version);
- break;
- }
- }
-
- builder.add(serviceInfo);
- } else {
- System.out.println(key.toString());
- LOGGER.warn("Duplicate service encountered. May differ by tags. Try using more specific tags? " + key.toString());
- }
- }
- keySet.add(key);
-
- }
-
-
- return builder.build();
- }
-
- private void updateIndex(ConsulResponse<Map<String,List<String>>> consulResponse) {
- if (consulResponse != null && consulResponse.getIndex() != null) {
- this.latestIndex.set(consulResponse.getIndex());
- }
- }
-
- protected static QueryOptions watchParams(BigInteger index, int blockSeconds) {
- if (index == null) {
- return QueryOptions.BLANK;
- } else {
- return QueryOptions.blockSeconds(blockSeconds, index).build();
- }
- }
-
- /**
- * passed in by creators to vary the content of the cached values
- *
- * @param <V>
- */
- protected interface CallbackConsumer<V> {
- void consume(BigInteger index, ConsulResponseCallback<Map<String,List<String>>> callback);
- }
-
- /**
- * Implementers can register a listener to receive
- * a new map when it changes
- *
- * @param <V>
- */
- public interface Listener<K, V> {
- void notify(List<ServiceInfo> oldValues,List<ServiceInfo> newValues);
- }
-
- public boolean addListener(Listener<K, V> listener) {
- boolean added = listeners.add(listener);
- if (state.get() == State.started) {
- listener.notify(lastResponse.get(),lastResponse.get());
- }
- return added;
- }
-
- public boolean removeListener(Listener<K, V> listener) {
- return listeners.remove(listener);
- }
-
- @VisibleForTesting
- protected State getState() {
- return state.get();
- }
-
-
-
-}
diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java deleted file mode 100644 index 0134eeb..0000000 --- a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/HealthCache.java +++ /dev/null @@ -1,70 +0,0 @@ -/**
-* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.openo.msb.wrapper.consul.cache;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import org.openo.msb.wrapper.consul.HealthClient;
-import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
-import org.openo.msb.wrapper.consul.model.health.ServiceHealth;
-
-import com.google.common.base.Function;
-
-public class HealthCache extends ConsulCache<String, ServiceHealth>{
-
- private final String serviceName;
-
- private HealthCache(Function<ServiceHealth, String> keyConversion,
- ConsulCache.CallbackConsumer<ServiceHealth> callbackConsumer,String serviceName) {
- super(keyConversion, callbackConsumer);
- this.serviceName=serviceName;
- // TODO Auto-generated constructor stub
- }
-
-
- public static HealthCache newCache(
- final HealthClient healthClient,
- final String serviceName,
- final int watchSeconds){
- Function<ServiceHealth,String> keyExtractor = new Function<ServiceHealth, String>() {
- @Override
- public String apply(ServiceHealth input) {
- //return input.getKey().substring(rootPath.length() + 1);
- return input.getService().getId();
- }
- };
-
- final CallbackConsumer<ServiceHealth> callbackConsumer = new CallbackConsumer<ServiceHealth>() {
- @Override
- public void consume(BigInteger index, ConsulResponseCallback<List<ServiceHealth>> callback) {
- healthClient.getHealthyServiceInstances(serviceName, watchParams(index, watchSeconds),callback);
- }
- };
-
-
- return new HealthCache(keyExtractor, callbackConsumer,serviceName);
-
-
- }
-
- public String getServiceName(){
- return this.serviceName;
- }
-
-
-}
\ No newline at end of file diff --git a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java b/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java deleted file mode 100644 index 80cdb7b..0000000 --- a/apiroute/apiroute-service/src/main/java/org/openo/msb/wrapper/consul/cache/ServiceCache.java +++ /dev/null @@ -1,50 +0,0 @@ -/**
-* Copyright (C) 2016 ZTE, Inc. and others. All rights reserved. (ZTE)
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.openo.msb.wrapper.consul.cache;
-
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Map;
-
-import org.openo.msb.wrapper.consul.CatalogClient;
-import org.openo.msb.wrapper.consul.async.ConsulResponseCallback;
-
-public class ServiceCache extends ConsulCache4Map<String, Map<String, List<String>>> {
- private ServiceCache( ConsulCache4Map.CallbackConsumer<Map<String, List<String>>> callbackConsumer) {
- super(callbackConsumer);
- // TODO Auto-generated constructor stub
- }
-
-
- public static ServiceCache newCache(
- final CatalogClient catalogClient,
- final int watchSeconds){
-
-
- final CallbackConsumer<Map<String, List<String>>> callbackConsumer = new CallbackConsumer<Map<String, List<String>>>() {
- @Override
- public void consume(BigInteger index, ConsulResponseCallback<Map<String, List<String>>> callback) {
- catalogClient.getService(watchParams(index, watchSeconds),callback);
- }
- };
-
-
- return new ServiceCache(callbackConsumer);
-
-
- }
-}
|