diff options
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache')
3 files changed, 278 insertions, 321 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java index 5325b89..b389efc 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -44,273 +42,248 @@ import com.orbitz.consul.option.ImmutableQueryOptions; import com.orbitz.consul.option.QueryOptions; /** - * A cache structure that can provide an up-to-date read-only map backed by - * consul data + * A cache structure that can provide an up-to-date read-only map backed by consul data * * @param <V> */ public class ConsulCache<T> { - enum State { - latent, starting, started, stopped - } - - private final static Logger LOGGER = LoggerFactory - .getLogger(ConsulCache.class); - - @VisibleForTesting - static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay"; - private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System - .getProperties()); - - private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>( - null); - private final AtomicReference<State> state = new AtomicReference<State>( - State.latent); - private final CountDownLatch initLatch = new CountDownLatch(1); - private final ScheduledExecutorService executorService = Executors - .newSingleThreadScheduledExecutor(); - private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>(); - - private final CallbackConsumer<T> callBackConsumer; - private final ConsulResponseCallback<T> responseCallback; - - ConsulCache(CallbackConsumer<T> callbackConsumer) { - - this.callBackConsumer = callbackConsumer; - - this.responseCallback = new ConsulResponseCallback<T>() { - @Override - public void onComplete(ConsulResponse<T> consulResponse) { - - if (consulResponse.isKnownLeader()) { - if (!isRunning()) { - return; - } - updateIndex(consulResponse); - - for (Listener<T> l : listeners) { - l.notify(consulResponse); - } - - if (state.compareAndSet(State.starting, State.started)) { - initLatch.countDown(); - } - - runCallback(); - } else { - onFailure(new ConsulException( - "Consul cluster has no elected leader")); - } - } - - @Override - public void onDelayComplete( - OriginalConsulResponse<T> originalConsulResponse) { - - try { - // get header - ConsulResponseHeader consulResponseHeader = Http - .consulResponseHeader(originalConsulResponse - .getResponse()); - - if (consulResponseHeader.isKnownLeader()) { - if (!isRunning()) { - return; - } - - boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader - .getIndex()); - // consul index different - if (isConuslIndexChanged) { - - updateIndex(consulResponseHeader.getIndex()); - - // get T type data - ConsulResponse<T> consulResponse = Http - .consulResponse(originalConsulResponse - .getResponseType(), - originalConsulResponse - .getResponse()); - - // notify customer to custom T data - for (Listener<T> l : listeners) { - l.notify(consulResponse); - } - } - - if (state.compareAndSet(State.starting, State.started)) { - initLatch.countDown(); - } - - runCallback(); - - } else { - onFailure(new ConsulException( - "Consul cluster has no elected leader")); - } - } catch (Exception e) { - onFailure(e); - } - - } - - @Override - public void onFailure(Throwable throwable) { - - if (!isRunning()) { - return; - } - LOGGER.error( - String.format( - "Error getting response from consul. will retry in %d %s", - BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), - throwable); - - executorService.schedule(new Runnable() { - @Override - public void run() { - runCallback(); - } - }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS); - } - }; - } - - @VisibleForTesting - static long getBackOffDelayInMs(Properties properties) { - String backOffDelay = null; - try { - backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY); - if (!Strings.isNullOrEmpty(backOffDelay)) { - return Long.parseLong(backOffDelay); - } - } catch (Exception ex) { - LOGGER.warn( - backOffDelay != null ? String.format( - "Error parsing property variable %s: %s", - BACKOFF_DELAY_PROPERTY, backOffDelay) : String - .format("Error extracting property variable %s", - BACKOFF_DELAY_PROPERTY), ex); - } - return TimeUnit.SECONDS.toMillis(10); - } - - public void start() throws Exception { - checkState(state.compareAndSet(State.latent, State.starting), - "Cannot transition from state %s to %s", state.get(), - State.starting); - runCallback(); - } - - public void stop() throws Exception { - State previous = state.getAndSet(State.stopped); - if (previous != State.stopped) { - executorService.shutdownNow(); - } - } - - private void runCallback() { - if (isRunning()) { - callBackConsumer.consume(latestIndex.get(), responseCallback); - } - } - - private boolean isRunning() { - return state.get() == State.started || state.get() == State.starting; - } - - public boolean awaitInitialized(long timeout, TimeUnit unit) - throws InterruptedException { - return initLatch.await(timeout, unit); - } - - private void updateIndex(ConsulResponse<T> consulResponse) { - if (consulResponse != null && consulResponse.getIndex() != null) { - this.latestIndex.set(consulResponse.getIndex()); - } - } - - public void updateIndex(BigInteger index) { - if (index != null) { - this.latestIndex.set(index); - } - } - - protected static QueryOptions watchParams(final BigInteger index, - final int blockSeconds, QueryOptions queryOptions) { - checkArgument(!queryOptions.getIndex().isPresent() - && !queryOptions.getWait().isPresent(), - "Index and wait cannot be overridden"); - - return ImmutableQueryOptions.builder() - .from(watchDefaultParams(index, blockSeconds)) - .token(queryOptions.getToken()) - .consistencyMode(queryOptions.getConsistencyMode()) - .near(queryOptions.getNear()).build(); - } - - private static QueryOptions watchDefaultParams(final BigInteger index, - final int blockSeconds) { - if (index == null) { - return QueryOptions.BLANK; - } else { - return QueryOptions.blockSeconds(blockSeconds, index).build(); - } - } - - /** - * passed in by creators to vary the content of the cached values - * - * @param <V> - */ - protected interface CallbackConsumer<T> { - void consume(BigInteger index, ConsulResponseCallback<T> callback); - } - - /** - * Implementers can register a listener to receive a new map when it changes - * - * @param <V> - */ - public interface Listener<T> { - void notify(ConsulResponse<T> newValues); - } - - public boolean addListener(Listener<T> listener) { - boolean added = listeners.add(listener); - return added; - } - - public List<Listener<T>> getListeners() { - return Collections.unmodifiableList(listeners); - } - - public boolean removeListener(Listener<T> listener) { - return listeners.remove(listener); - } - - @VisibleForTesting - protected State getState() { - return state.get(); - } - - private boolean isConuslIndexChanged(final BigInteger index) { - - if (index != null && !index.equals(latestIndex.get())) { - - if (LOGGER.isDebugEnabled()) { - // 第一次不打印 - if (latestIndex.get() != null) { - LOGGER.debug("consul index compare:new-" + index + " old-" - + latestIndex.get()); - } - - } - - return true; - } - - return false; - } + enum State { + latent, starting, started, stopped + } + + private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class); + + @VisibleForTesting + static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay"; + private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System.getProperties()); + + private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null); + private final AtomicReference<State> state = new AtomicReference<State>(State.latent); + private final CountDownLatch initLatch = new CountDownLatch(1); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>(); + + private final CallbackConsumer<T> callBackConsumer; + private final ConsulResponseCallback<T> responseCallback; + + ConsulCache(CallbackConsumer<T> callbackConsumer) { + + this.callBackConsumer = callbackConsumer; + + this.responseCallback = new ConsulResponseCallback<T>() { + @Override + public void onComplete(ConsulResponse<T> consulResponse) { + + if (consulResponse.isKnownLeader()) { + if (!isRunning()) { + return; + } + updateIndex(consulResponse); + + for (Listener<T> l : listeners) { + l.notify(consulResponse); + } + + if (state.compareAndSet(State.starting, State.started)) { + initLatch.countDown(); + } + + runCallback(); + } else { + onFailure(new ConsulException("Consul cluster has no elected leader")); + } + } + + @Override + public void onDelayComplete(OriginalConsulResponse<T> originalConsulResponse) { + + try { + // get header + ConsulResponseHeader consulResponseHeader = + Http.consulResponseHeader(originalConsulResponse.getResponse()); + + if (consulResponseHeader.isKnownLeader()) { + if (!isRunning()) { + return; + } + + boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader.getIndex()); + // consul index different + if (isConuslIndexChanged) { + + updateIndex(consulResponseHeader.getIndex()); + + // get T type data + ConsulResponse<T> consulResponse = + Http.consulResponse(originalConsulResponse.getResponseType(), + originalConsulResponse.getResponse()); + + // notify customer to custom T data + for (Listener<T> l : listeners) { + l.notify(consulResponse); + } + } + + if (state.compareAndSet(State.starting, State.started)) { + initLatch.countDown(); + } + + runCallback(); + + } else { + onFailure(new ConsulException("Consul cluster has no elected leader")); + } + } catch (Exception e) { + onFailure(e); + } + + } + + @Override + public void onFailure(Throwable throwable) { + + if (!isRunning()) { + return; + } + LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", + BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), throwable); + + executorService.schedule(new Runnable() { + @Override + public void run() { + runCallback(); + } + }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS); + } + }; + } + + @VisibleForTesting + static long getBackOffDelayInMs(Properties properties) { + String backOffDelay = null; + try { + backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY); + if (!Strings.isNullOrEmpty(backOffDelay)) { + return Long.parseLong(backOffDelay); + } + } catch (Exception ex) { + LOGGER.warn(backOffDelay != null + ? String.format("Error parsing property variable %s: %s", BACKOFF_DELAY_PROPERTY, + backOffDelay) + : String.format("Error extracting property variable %s", BACKOFF_DELAY_PROPERTY), ex); + } + return TimeUnit.SECONDS.toMillis(10); + } + + public void start() throws Exception { + checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s", + state.get(), State.starting); + runCallback(); + } + + public void stop() throws Exception { + State previous = state.getAndSet(State.stopped); + if (previous != State.stopped) { + executorService.shutdownNow(); + } + } + + private void runCallback() { + if (isRunning()) { + callBackConsumer.consume(latestIndex.get(), responseCallback); + } + } + + private boolean isRunning() { + return state.get() == State.started || state.get() == State.starting; + } + + public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initLatch.await(timeout, unit); + } + + private void updateIndex(ConsulResponse<T> consulResponse) { + if (consulResponse != null && consulResponse.getIndex() != null) { + this.latestIndex.set(consulResponse.getIndex()); + } + } + + public void updateIndex(BigInteger index) { + if (index != null) { + this.latestIndex.set(index); + } + } + + protected static QueryOptions watchParams(final BigInteger index, final int blockSeconds, + QueryOptions queryOptions) { + checkArgument(!queryOptions.getIndex().isPresent() && !queryOptions.getWait().isPresent(), + "Index and wait cannot be overridden"); + + return ImmutableQueryOptions.builder().from(watchDefaultParams(index, blockSeconds)) + .token(queryOptions.getToken()).consistencyMode(queryOptions.getConsistencyMode()) + .near(queryOptions.getNear()).build(); + } + + private static QueryOptions watchDefaultParams(final BigInteger index, final int blockSeconds) { + if (index == null) { + return QueryOptions.BLANK; + } else { + return QueryOptions.blockSeconds(blockSeconds, index).build(); + } + } + + /** + * passed in by creators to vary the content of the cached values + * + * @param <V> + */ + protected interface CallbackConsumer<T> { + void consume(BigInteger index, ConsulResponseCallback<T> callback); + } + + /** + * Implementers can register a listener to receive a new map when it changes + * + * @param <V> + */ + public interface Listener<T> { + void notify(ConsulResponse<T> newValues); + } + + public boolean addListener(Listener<T> listener) { + boolean added = listeners.add(listener); + return added; + } + + public List<Listener<T>> getListeners() { + return Collections.unmodifiableList(listeners); + } + + public boolean removeListener(Listener<T> listener) { + return listeners.remove(listener); + } + + @VisibleForTesting + protected State getState() { + return state.get(); + } + + private boolean isConuslIndexChanged(final BigInteger index) { + + if (index != null && !index.equals(latestIndex.get())) { + + if (LOGGER.isDebugEnabled()) { + // 第一次不打印 + if (latestIndex.get() != null) { + LOGGER.debug("consul index compare:new-" + index + " old-" + latestIndex.get()); + } + + } + + return true; + } + + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java index f8bd224..1b10730 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -36,41 +34,32 @@ public class ServiceHealthCache extends ConsulCache<List<ServiceHealth>> { * Keys will be a {@link HostAndPort} object made up of the service's address/port combo * * @param healthClient the {@link HealthClient} - * @param serviceName the name of the service - * @param passing include only passing services? + * @param serviceName the name of the service + * @param passing include only passing services? * @return a cache object */ - public static ServiceHealthCache newCache( - final HealthClient healthClient, - final String serviceName, - final boolean passing, - final CatalogOptions catalogOptions, - final int watchSeconds, - final QueryOptions queryOptions) { + public static ServiceHealthCache newCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { CallbackConsumer<List<ServiceHealth>> callbackConsumer = new CallbackConsumer<List<ServiceHealth>>() { - @Override - public void consume(BigInteger index, - ConsulResponseCallback<List<ServiceHealth>> callback) { - // TODO Auto-generated method stub + @Override + public void consume(BigInteger index, ConsulResponseCallback<List<ServiceHealth>> callback) { + // TODO Auto-generated method stub QueryOptions params = watchParams(index, watchSeconds, queryOptions); if (passing) { healthClient.getHealthyServiceInstances(serviceName, catalogOptions, params, callback); } else { healthClient.getAllServiceInstances(serviceName, catalogOptions, params, callback); } - } + } }; return new ServiceHealthCache(callbackConsumer); } - public static ServiceHealthCache newCache( - final HealthClient healthClient, - final String serviceName, - final boolean passing, - final CatalogOptions catalogOptions, - final int watchSeconds) { + public static ServiceHealthCache newCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds) { return newCache(healthClient, serviceName, passing, catalogOptions, watchSeconds, QueryOptions.BLANK); } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java index e0961f0..ced3c2c 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -25,29 +23,26 @@ import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; public class ServicesCatalogCache extends ConsulCache<HttpEntity> { - + private ServicesCatalogCache(CallbackConsumer<HttpEntity> callbackConsumer) { super(callbackConsumer); } - public static ServicesCatalogCache newCache( - final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) { - + public static ServicesCatalogCache newCache(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + CallbackConsumer<HttpEntity> callbackConsumer = new CallbackConsumer<HttpEntity>() { @Override public void consume(BigInteger index, ConsulResponseCallback<HttpEntity> callback) { - QueryOptions params = watchParams(index, watchSeconds, queryOptions); - catalogClient.getServices(catalogOptions, params,callback); + QueryOptions params = watchParams(index, watchSeconds, queryOptions); + catalogClient.getServices(catalogOptions, params, callback); } }; return new ServicesCatalogCache(callbackConsumer); } - + public static ServicesCatalogCache newCache(final CatalogClient catalogClient) { return newCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, 10); } |