diff options
author | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-09-07 14:33:18 +0800 |
---|---|---|
committer | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-09-07 14:40:59 +0800 |
commit | e75a8ef2372722c0b22669fb427d47bacc5b8d5e (patch) | |
tree | cee85cbc4fe818262fb8c4d733f2fac0c8024df8 /apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend | |
parent | e5fe5a022f4cc5164c1f4516c024617c49f12978 (diff) |
Fix java check style warning
Change-Id: I98a6d7237a213d007ad4d954989cb0b0fa150a10
Issue-Id: MSB-67
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend')
20 files changed, 1364 insertions, 1554 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/CatalogClient.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/CatalogClient.java index bb0edbf..0632108 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/CatalogClient.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/CatalogClient.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend; @@ -32,71 +30,64 @@ import com.orbitz.consul.option.QueryOptions; */ public class CatalogClient { - private static final Logger LOGGER = LoggerFactory - .getLogger(CatalogClient.class); - - private static final TypeReference<HttpEntity> TYPE_SERVICES_MAP = new TypeReference<HttpEntity>() { - }; - - - private static final String CATALOG_URI_8500 = "/v1/catalog"; - private static final String CATAlOG_URI_10081 = "/api/catalog/v1"; - - private static final String GET_SERVICES_URI = "/services"; - - private static final Http httpClient = Http.getInstance(); - - private HttpHost targetHost = null; - private String catalogUri = CATAlOG_URI_10081; - - CatalogClient(final HttpHost targetHost) { - this.targetHost = targetHost; - if (targetHost.getPort() == 8500) { - catalogUri = CATALOG_URI_8500; - } - } - - /** - * Retrieves all services for a given datacenter with - * {@link com.orbitz.consul.option.QueryOptions}. - * - * GET /v1/catalog/services?dc={datacenter} - * - * @param catalogOptions - * Catalog specific options to use. - * @param queryOptions - * The Query Options to use. - * @return A {@link com.orbitz.consul.model.ConsulResponse} containing a map - * of service name to list of tags. - */ - public void getServices(CatalogOptions catalogOptions, - QueryOptions queryOptions, - ConsulResponseCallback<HttpEntity> callback) { - - // prepare access path - // path:10081 vs 8500 - String path = targetHost.toString() + catalogUri + GET_SERVICES_URI; - - // params:wait,index,dc...... - String params = Http.optionsFrom(catalogOptions, queryOptions); - - // node meta: ns,external,internal..... - String node_meta = ConfigUtil.getInstance().getNodeMetaQueryParam(); - - // add params - path = (params != null && !params.isEmpty()) ? path += "?" + params - : path; - - // add node_meta - if (node_meta != null && !node_meta.isEmpty()) { - path = path.contains("?") ? path +"&"+ node_meta : path + "?" - + node_meta; - } - - // async watch services - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("get all services:" + path); - } - httpClient.asyncGetDelayHandle(path, TYPE_SERVICES_MAP, callback); - } + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogClient.class); + + private static final TypeReference<HttpEntity> TYPE_SERVICES_MAP = new TypeReference<HttpEntity>() {}; + + + private static final String CATALOG_URI_8500 = "/v1/catalog"; + private static final String CATAlOG_URI_10081 = "/api/catalog/v1"; + + private static final String GET_SERVICES_URI = "/services"; + + private static final Http httpClient = Http.getInstance(); + + private HttpHost targetHost = null; + private String catalogUri = CATAlOG_URI_10081; + + CatalogClient(final HttpHost targetHost) { + this.targetHost = targetHost; + if (targetHost.getPort() == 8500) { + catalogUri = CATALOG_URI_8500; + } + } + + /** + * Retrieves all services for a given datacenter with + * {@link com.orbitz.consul.option.QueryOptions}. + * + * GET /v1/catalog/services?dc={datacenter} + * + * @param catalogOptions Catalog specific options to use. + * @param queryOptions The Query Options to use. + * @return A {@link com.orbitz.consul.model.ConsulResponse} containing a map of service name to + * list of tags. + */ + public void getServices(CatalogOptions catalogOptions, QueryOptions queryOptions, + ConsulResponseCallback<HttpEntity> callback) { + + // prepare access path + // path:10081 vs 8500 + String path = targetHost.toString() + catalogUri + GET_SERVICES_URI; + + // params:wait,index,dc...... + String params = Http.optionsFrom(catalogOptions, queryOptions); + + // node meta: ns,external,internal..... + String node_meta = ConfigUtil.getInstance().getNodeMetaQueryParam(); + + // add params + path = (params != null && !params.isEmpty()) ? path += "?" + params : path; + + // add node_meta + if (node_meta != null && !node_meta.isEmpty()) { + path = path.contains("?") ? path + "&" + node_meta : path + "?" + node_meta; + } + + // async watch services + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("get all services:" + path); + } + httpClient.asyncGetDelayHandle(path, TYPE_SERVICES_MAP, callback); + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/Consul.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/Consul.java index a83e9ab..24427c5 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/Consul.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/Consul.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend; @@ -22,91 +20,90 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; public class Consul { - /** - * Default Consul HTTP API host. - */ - public static final String DEFAULT_HTTP_HOST = "localhost"; - - /** - * Default Consul HTTP API port. - */ - public static final int DEFAULT_HTTP_PORT = 8500; - - private static final Logger LOGGER = LoggerFactory - .getLogger(Consul.class); - - private final CatalogClient catalogClient; - private final HealthClient healthClient; - - private Consul(CatalogClient catalogClient, HealthClient healthClient) { - this.catalogClient = catalogClient; - this.healthClient = healthClient; - } - - /** - * Get the Catalog HTTP client. - * <p> - * /v1/catalog - * - * @return The Catalog HTTP client. - */ - public CatalogClient catalogClient() { - return catalogClient; - } - - /** - * Get the Health HTTP client. - * <p> - * /v1/health - * - * @return The Health HTTP client. - */ - public HealthClient healthClient() { - return healthClient; - } - - /** - * Creates a new {@link Builder} object. - * - * @return A new Consul builder. - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Used to create a default Consul client. - * - * @return A default {@link Consul} client. - */ - @VisibleForTesting - public static Consul newClient() { - return builder().build(); - } - - public static class Builder { - - private HttpHost targetHost; - - { - targetHost = new HttpHost(DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT); - } - - Builder() { - - } - - public Builder withHostAndPort(String hostname, int port) { - this.targetHost = new HttpHost(hostname, port); - return this; - } - - public Consul build() { - LOGGER.info("********build consul:"+targetHost.toString()+"****************"); - CatalogClient catalogClient = new CatalogClient(targetHost); - HealthClient healthClient = new HealthClient(targetHost); - return new Consul(catalogClient,healthClient); - } - - } + /** + * Default Consul HTTP API host. + */ + public static final String DEFAULT_HTTP_HOST = "localhost"; + + /** + * Default Consul HTTP API port. + */ + public static final int DEFAULT_HTTP_PORT = 8500; + + private static final Logger LOGGER = LoggerFactory.getLogger(Consul.class); + + private final CatalogClient catalogClient; + private final HealthClient healthClient; + + private Consul(CatalogClient catalogClient, HealthClient healthClient) { + this.catalogClient = catalogClient; + this.healthClient = healthClient; + } + + /** + * Get the Catalog HTTP client. + * <p> + * /v1/catalog + * + * @return The Catalog HTTP client. + */ + public CatalogClient catalogClient() { + return catalogClient; + } + + /** + * Get the Health HTTP client. + * <p> + * /v1/health + * + * @return The Health HTTP client. + */ + public HealthClient healthClient() { + return healthClient; + } + + /** + * Creates a new {@link Builder} object. + * + * @return A new Consul builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Used to create a default Consul client. + * + * @return A default {@link Consul} client. + */ + @VisibleForTesting + public static Consul newClient() { + return builder().build(); + } + + public static class Builder { + + private HttpHost targetHost; + + { + targetHost = new HttpHost(DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT); + } + + Builder() { + + } + + public Builder withHostAndPort(String hostname, int port) { + this.targetHost = new HttpHost(hostname, port); + return this; + } + + public Consul build() { + LOGGER.info("********build consul:" + targetHost.toString() + "****************"); + CatalogClient catalogClient = new CatalogClient(targetHost); + HealthClient healthClient = new HealthClient(targetHost); + return new Consul(catalogClient, healthClient); + } + + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/HealthClient.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/HealthClient.java index 85c9b78..3809247 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/HealthClient.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/HealthClient.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend; @@ -32,96 +30,84 @@ import com.orbitz.consul.option.QueryOptions; * HTTP Client for /v1/health/ endpoints. */ public class HealthClient { - private final static Logger LOGGER = LoggerFactory - .getLogger(HealthClient.class); - - private static final TypeReference<List<ServiceHealth>> TYPE_SERVICE_HEALTH_LIST = new TypeReference<List<ServiceHealth>>() { - }; - - private static final String HEALTH_URI_10081 = "/api/health/v1"; - private static final String HEALTH_URI_8500 = "/v1/health"; - private static final String GET_HEALTH_SERVICE_URI = "/service"; - -// private static final String GET_HEALTH_SERVICE_URI = "/v1/health/service"; - -// private static final String GET_HEALTH_SERVICE_URI = "/api/health/v1/service"; - - private final static Http httpClient = Http.getInstance(); - - private HttpHost targetHost = null; - private String healthUri = HEALTH_URI_10081; - - HealthClient(final HttpHost targetHost) { - this.targetHost = targetHost; - - if(targetHost.getPort() == 8500) - { - healthUri = HEALTH_URI_8500; - } - } - - /** - * Asynchronously retrieves the healthchecks for all healthy service - * instances in a given datacenter with - * {@link com.orbitz.consul.option.QueryOptions}. - * - * GET /v1/health/service/{service}?dc={datacenter}&passing - * - * Experimental. - * - * @param service - * The service to query. - * @param catalogOptions - * The catalog specific options to use. - * @param queryOptions - * The Query Options to use. - * @param callback - * Callback implemented by callee to handle results. - */ - public void getHealthyServiceInstances(String service, - CatalogOptions catalogOptions, QueryOptions queryOptions, - ConsulResponseCallback<List<ServiceHealth>> callback) { - // prepare access path - String path = targetHost.toString() + healthUri + GET_HEALTH_SERVICE_URI + "/"+ service; - - String params = Http.optionsFrom(catalogOptions, queryOptions); - path = (params != null && !params.isEmpty()) ? path += "?" - + params : path; //query all nodes without filter for health - - // async watch -// LOGGER.info("get health paasing service:" + path); - httpClient.asyncGetDelayHandle(path, TYPE_SERVICE_HEALTH_LIST, callback); - } - - /** - * Asynchronously retrieves the healthchecks for all nodes in a given - * datacenter with {@link com.orbitz.consul.option.QueryOptions}. - * - * GET /v1/health/service/{service}?dc={datacenter} - * - * Experimental. - * - * @param service - * The service to query. - * @param catalogOptions - * The catalog specific options to use. - * @param queryOptions - * The Query Options to use. - * @param callback - * Callback implemented by callee to handle results. - */ - public void getAllServiceInstances(String service, - CatalogOptions catalogOptions, QueryOptions queryOptions, - ConsulResponseCallback<List<ServiceHealth>> callback) { - - // prepare access path - String path = targetHost.toString() + healthUri + GET_HEALTH_SERVICE_URI + "/"+ service; - String params = Http.optionsFrom(catalogOptions, queryOptions); - path = (params != null && !params.isEmpty()) ? path += "?" + params - : path; - - // async watch -// LOGGER.debug("get service:" + path); - httpClient.asyncGetDelayHandle(path, TYPE_SERVICE_HEALTH_LIST, callback); - } + private final static Logger LOGGER = LoggerFactory.getLogger(HealthClient.class); + + private static final TypeReference<List<ServiceHealth>> TYPE_SERVICE_HEALTH_LIST = + new TypeReference<List<ServiceHealth>>() {}; + + private static final String HEALTH_URI_10081 = "/api/health/v1"; + private static final String HEALTH_URI_8500 = "/v1/health"; + private static final String GET_HEALTH_SERVICE_URI = "/service"; + + // private static final String GET_HEALTH_SERVICE_URI = "/v1/health/service"; + + // private static final String GET_HEALTH_SERVICE_URI = "/api/health/v1/service"; + + private final static Http httpClient = Http.getInstance(); + + private HttpHost targetHost = null; + private String healthUri = HEALTH_URI_10081; + + HealthClient(final HttpHost targetHost) { + this.targetHost = targetHost; + + if (targetHost.getPort() == 8500) { + healthUri = HEALTH_URI_8500; + } + } + + /** + * Asynchronously retrieves the healthchecks for all healthy service instances in a given + * datacenter with {@link com.orbitz.consul.option.QueryOptions}. + * + * GET /v1/health/service/{service}?dc={datacenter}&passing + * + * Experimental. + * + * @param service The service to query. + * @param catalogOptions The catalog specific options to use. + * @param queryOptions The Query Options to use. + * @param callback Callback implemented by callee to handle results. + */ + public void getHealthyServiceInstances(String service, CatalogOptions catalogOptions, QueryOptions queryOptions, + ConsulResponseCallback<List<ServiceHealth>> callback) { + // prepare access path + String path = targetHost.toString() + healthUri + GET_HEALTH_SERVICE_URI + "/" + service; + + String params = Http.optionsFrom(catalogOptions, queryOptions); + path = (params != null && !params.isEmpty()) ? path += "?" + params : path; // query all + // nodes without + // filter for + // health + + // async watch + // LOGGER.info("get health paasing service:" + path); + httpClient.asyncGetDelayHandle(path, TYPE_SERVICE_HEALTH_LIST, callback); + } + + /** + * Asynchronously retrieves the healthchecks for all nodes in a given datacenter with + * {@link com.orbitz.consul.option.QueryOptions}. + * + * GET /v1/health/service/{service}?dc={datacenter} + * + * Experimental. + * + * @param service The service to query. + * @param catalogOptions The catalog specific options to use. + * @param queryOptions The Query Options to use. + * @param callback Callback implemented by callee to handle results. + */ + public void getAllServiceInstances(String service, CatalogOptions catalogOptions, QueryOptions queryOptions, + ConsulResponseCallback<List<ServiceHealth>> callback) { + + // prepare access path + String path = targetHost.toString() + healthUri + GET_HEALTH_SERVICE_URI + "/" + service; + String params = Http.optionsFrom(catalogOptions, queryOptions); + path = (params != null && !params.isEmpty()) ? path += "?" + params : path; + + // async watch + // LOGGER.debug("get service:" + path); + httpClient.asyncGetDelayHandle(path, TYPE_SERVICE_HEALTH_LIST, callback); + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseCallback.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseCallback.java index 536cfbd..faa00d0 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseCallback.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseCallback.java @@ -1,25 +1,23 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.async; import com.orbitz.consul.model.ConsulResponse; /** - * For API calls that support long-polling, this callback is used to handle - * the result on success or failure for an async HTTP call. + * For API calls that support long-polling, this callback is used to handle the result on success or + * failure for an async HTTP call. * * @param <T> The Response type. */ @@ -38,7 +36,7 @@ public interface ConsulResponseCallback<T> { * @param consulResponse The Consul response. */ void onDelayComplete(OriginalConsulResponse<T> originalConsulResponse); - + /** * Callback for an unsuccessful request. * diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseHeader.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseHeader.java index 136fd49..76e0ff9 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseHeader.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/ConsulResponseHeader.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.async; @@ -21,7 +19,7 @@ public class ConsulResponseHeader { private final long lastContact; private final boolean knownLeader; private final BigInteger index; - + public ConsulResponseHeader(long lastContact, boolean knownLeader, BigInteger index) { this.lastContact = lastContact; this.knownLeader = knownLeader; diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/OriginalConsulResponse.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/OriginalConsulResponse.java index 3b72b24..6ad7c01 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/OriginalConsulResponse.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/async/OriginalConsulResponse.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.async; @@ -20,9 +18,9 @@ import org.apache.http.HttpResponse; import com.fasterxml.jackson.core.type.TypeReference; public class OriginalConsulResponse<T> { - final HttpResponse response; - final TypeReference<T> responseType; - + final HttpResponse response; + final TypeReference<T> responseType; + public OriginalConsulResponse(HttpResponse response, TypeReference<T> responseType) { this.response = response; this.responseType = responseType; @@ -34,9 +32,9 @@ public class OriginalConsulResponse<T> { } public TypeReference<T> getResponseType() { - return responseType; - } - - - + return responseType; + } + + + } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java index 5325b89..b389efc 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ConsulCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -44,273 +42,248 @@ import com.orbitz.consul.option.ImmutableQueryOptions; import com.orbitz.consul.option.QueryOptions; /** - * A cache structure that can provide an up-to-date read-only map backed by - * consul data + * A cache structure that can provide an up-to-date read-only map backed by consul data * * @param <V> */ public class ConsulCache<T> { - enum State { - latent, starting, started, stopped - } - - private final static Logger LOGGER = LoggerFactory - .getLogger(ConsulCache.class); - - @VisibleForTesting - static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay"; - private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System - .getProperties()); - - private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>( - null); - private final AtomicReference<State> state = new AtomicReference<State>( - State.latent); - private final CountDownLatch initLatch = new CountDownLatch(1); - private final ScheduledExecutorService executorService = Executors - .newSingleThreadScheduledExecutor(); - private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>(); - - private final CallbackConsumer<T> callBackConsumer; - private final ConsulResponseCallback<T> responseCallback; - - ConsulCache(CallbackConsumer<T> callbackConsumer) { - - this.callBackConsumer = callbackConsumer; - - this.responseCallback = new ConsulResponseCallback<T>() { - @Override - public void onComplete(ConsulResponse<T> consulResponse) { - - if (consulResponse.isKnownLeader()) { - if (!isRunning()) { - return; - } - updateIndex(consulResponse); - - for (Listener<T> l : listeners) { - l.notify(consulResponse); - } - - if (state.compareAndSet(State.starting, State.started)) { - initLatch.countDown(); - } - - runCallback(); - } else { - onFailure(new ConsulException( - "Consul cluster has no elected leader")); - } - } - - @Override - public void onDelayComplete( - OriginalConsulResponse<T> originalConsulResponse) { - - try { - // get header - ConsulResponseHeader consulResponseHeader = Http - .consulResponseHeader(originalConsulResponse - .getResponse()); - - if (consulResponseHeader.isKnownLeader()) { - if (!isRunning()) { - return; - } - - boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader - .getIndex()); - // consul index different - if (isConuslIndexChanged) { - - updateIndex(consulResponseHeader.getIndex()); - - // get T type data - ConsulResponse<T> consulResponse = Http - .consulResponse(originalConsulResponse - .getResponseType(), - originalConsulResponse - .getResponse()); - - // notify customer to custom T data - for (Listener<T> l : listeners) { - l.notify(consulResponse); - } - } - - if (state.compareAndSet(State.starting, State.started)) { - initLatch.countDown(); - } - - runCallback(); - - } else { - onFailure(new ConsulException( - "Consul cluster has no elected leader")); - } - } catch (Exception e) { - onFailure(e); - } - - } - - @Override - public void onFailure(Throwable throwable) { - - if (!isRunning()) { - return; - } - LOGGER.error( - String.format( - "Error getting response from consul. will retry in %d %s", - BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), - throwable); - - executorService.schedule(new Runnable() { - @Override - public void run() { - runCallback(); - } - }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS); - } - }; - } - - @VisibleForTesting - static long getBackOffDelayInMs(Properties properties) { - String backOffDelay = null; - try { - backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY); - if (!Strings.isNullOrEmpty(backOffDelay)) { - return Long.parseLong(backOffDelay); - } - } catch (Exception ex) { - LOGGER.warn( - backOffDelay != null ? String.format( - "Error parsing property variable %s: %s", - BACKOFF_DELAY_PROPERTY, backOffDelay) : String - .format("Error extracting property variable %s", - BACKOFF_DELAY_PROPERTY), ex); - } - return TimeUnit.SECONDS.toMillis(10); - } - - public void start() throws Exception { - checkState(state.compareAndSet(State.latent, State.starting), - "Cannot transition from state %s to %s", state.get(), - State.starting); - runCallback(); - } - - public void stop() throws Exception { - State previous = state.getAndSet(State.stopped); - if (previous != State.stopped) { - executorService.shutdownNow(); - } - } - - private void runCallback() { - if (isRunning()) { - callBackConsumer.consume(latestIndex.get(), responseCallback); - } - } - - private boolean isRunning() { - return state.get() == State.started || state.get() == State.starting; - } - - public boolean awaitInitialized(long timeout, TimeUnit unit) - throws InterruptedException { - return initLatch.await(timeout, unit); - } - - private void updateIndex(ConsulResponse<T> consulResponse) { - if (consulResponse != null && consulResponse.getIndex() != null) { - this.latestIndex.set(consulResponse.getIndex()); - } - } - - public void updateIndex(BigInteger index) { - if (index != null) { - this.latestIndex.set(index); - } - } - - protected static QueryOptions watchParams(final BigInteger index, - final int blockSeconds, QueryOptions queryOptions) { - checkArgument(!queryOptions.getIndex().isPresent() - && !queryOptions.getWait().isPresent(), - "Index and wait cannot be overridden"); - - return ImmutableQueryOptions.builder() - .from(watchDefaultParams(index, blockSeconds)) - .token(queryOptions.getToken()) - .consistencyMode(queryOptions.getConsistencyMode()) - .near(queryOptions.getNear()).build(); - } - - private static QueryOptions watchDefaultParams(final BigInteger index, - final int blockSeconds) { - if (index == null) { - return QueryOptions.BLANK; - } else { - return QueryOptions.blockSeconds(blockSeconds, index).build(); - } - } - - /** - * passed in by creators to vary the content of the cached values - * - * @param <V> - */ - protected interface CallbackConsumer<T> { - void consume(BigInteger index, ConsulResponseCallback<T> callback); - } - - /** - * Implementers can register a listener to receive a new map when it changes - * - * @param <V> - */ - public interface Listener<T> { - void notify(ConsulResponse<T> newValues); - } - - public boolean addListener(Listener<T> listener) { - boolean added = listeners.add(listener); - return added; - } - - public List<Listener<T>> getListeners() { - return Collections.unmodifiableList(listeners); - } - - public boolean removeListener(Listener<T> listener) { - return listeners.remove(listener); - } - - @VisibleForTesting - protected State getState() { - return state.get(); - } - - private boolean isConuslIndexChanged(final BigInteger index) { - - if (index != null && !index.equals(latestIndex.get())) { - - if (LOGGER.isDebugEnabled()) { - // 第一次不打印 - if (latestIndex.get() != null) { - LOGGER.debug("consul index compare:new-" + index + " old-" - + latestIndex.get()); - } - - } - - return true; - } - - return false; - } + enum State { + latent, starting, started, stopped + } + + private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class); + + @VisibleForTesting + static final String BACKOFF_DELAY_PROPERTY = "com.orbitz.consul.cache.backOffDelay"; + private static final long BACKOFF_DELAY_QTY_IN_MS = getBackOffDelayInMs(System.getProperties()); + + private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null); + private final AtomicReference<State> state = new AtomicReference<State>(State.latent); + private final CountDownLatch initLatch = new CountDownLatch(1); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final CopyOnWriteArrayList<Listener<T>> listeners = new CopyOnWriteArrayList<Listener<T>>(); + + private final CallbackConsumer<T> callBackConsumer; + private final ConsulResponseCallback<T> responseCallback; + + ConsulCache(CallbackConsumer<T> callbackConsumer) { + + this.callBackConsumer = callbackConsumer; + + this.responseCallback = new ConsulResponseCallback<T>() { + @Override + public void onComplete(ConsulResponse<T> consulResponse) { + + if (consulResponse.isKnownLeader()) { + if (!isRunning()) { + return; + } + updateIndex(consulResponse); + + for (Listener<T> l : listeners) { + l.notify(consulResponse); + } + + if (state.compareAndSet(State.starting, State.started)) { + initLatch.countDown(); + } + + runCallback(); + } else { + onFailure(new ConsulException("Consul cluster has no elected leader")); + } + } + + @Override + public void onDelayComplete(OriginalConsulResponse<T> originalConsulResponse) { + + try { + // get header + ConsulResponseHeader consulResponseHeader = + Http.consulResponseHeader(originalConsulResponse.getResponse()); + + if (consulResponseHeader.isKnownLeader()) { + if (!isRunning()) { + return; + } + + boolean isConuslIndexChanged = isConuslIndexChanged(consulResponseHeader.getIndex()); + // consul index different + if (isConuslIndexChanged) { + + updateIndex(consulResponseHeader.getIndex()); + + // get T type data + ConsulResponse<T> consulResponse = + Http.consulResponse(originalConsulResponse.getResponseType(), + originalConsulResponse.getResponse()); + + // notify customer to custom T data + for (Listener<T> l : listeners) { + l.notify(consulResponse); + } + } + + if (state.compareAndSet(State.starting, State.started)) { + initLatch.countDown(); + } + + runCallback(); + + } else { + onFailure(new ConsulException("Consul cluster has no elected leader")); + } + } catch (Exception e) { + onFailure(e); + } + + } + + @Override + public void onFailure(Throwable throwable) { + + if (!isRunning()) { + return; + } + LOGGER.error(String.format("Error getting response from consul. will retry in %d %s", + BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS), throwable); + + executorService.schedule(new Runnable() { + @Override + public void run() { + runCallback(); + } + }, BACKOFF_DELAY_QTY_IN_MS, TimeUnit.MILLISECONDS); + } + }; + } + + @VisibleForTesting + static long getBackOffDelayInMs(Properties properties) { + String backOffDelay = null; + try { + backOffDelay = properties.getProperty(BACKOFF_DELAY_PROPERTY); + if (!Strings.isNullOrEmpty(backOffDelay)) { + return Long.parseLong(backOffDelay); + } + } catch (Exception ex) { + LOGGER.warn(backOffDelay != null + ? String.format("Error parsing property variable %s: %s", BACKOFF_DELAY_PROPERTY, + backOffDelay) + : String.format("Error extracting property variable %s", BACKOFF_DELAY_PROPERTY), ex); + } + return TimeUnit.SECONDS.toMillis(10); + } + + public void start() throws Exception { + checkState(state.compareAndSet(State.latent, State.starting), "Cannot transition from state %s to %s", + state.get(), State.starting); + runCallback(); + } + + public void stop() throws Exception { + State previous = state.getAndSet(State.stopped); + if (previous != State.stopped) { + executorService.shutdownNow(); + } + } + + private void runCallback() { + if (isRunning()) { + callBackConsumer.consume(latestIndex.get(), responseCallback); + } + } + + private boolean isRunning() { + return state.get() == State.started || state.get() == State.starting; + } + + public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initLatch.await(timeout, unit); + } + + private void updateIndex(ConsulResponse<T> consulResponse) { + if (consulResponse != null && consulResponse.getIndex() != null) { + this.latestIndex.set(consulResponse.getIndex()); + } + } + + public void updateIndex(BigInteger index) { + if (index != null) { + this.latestIndex.set(index); + } + } + + protected static QueryOptions watchParams(final BigInteger index, final int blockSeconds, + QueryOptions queryOptions) { + checkArgument(!queryOptions.getIndex().isPresent() && !queryOptions.getWait().isPresent(), + "Index and wait cannot be overridden"); + + return ImmutableQueryOptions.builder().from(watchDefaultParams(index, blockSeconds)) + .token(queryOptions.getToken()).consistencyMode(queryOptions.getConsistencyMode()) + .near(queryOptions.getNear()).build(); + } + + private static QueryOptions watchDefaultParams(final BigInteger index, final int blockSeconds) { + if (index == null) { + return QueryOptions.BLANK; + } else { + return QueryOptions.blockSeconds(blockSeconds, index).build(); + } + } + + /** + * passed in by creators to vary the content of the cached values + * + * @param <V> + */ + protected interface CallbackConsumer<T> { + void consume(BigInteger index, ConsulResponseCallback<T> callback); + } + + /** + * Implementers can register a listener to receive a new map when it changes + * + * @param <V> + */ + public interface Listener<T> { + void notify(ConsulResponse<T> newValues); + } + + public boolean addListener(Listener<T> listener) { + boolean added = listeners.add(listener); + return added; + } + + public List<Listener<T>> getListeners() { + return Collections.unmodifiableList(listeners); + } + + public boolean removeListener(Listener<T> listener) { + return listeners.remove(listener); + } + + @VisibleForTesting + protected State getState() { + return state.get(); + } + + private boolean isConuslIndexChanged(final BigInteger index) { + + if (index != null && !index.equals(latestIndex.get())) { + + if (LOGGER.isDebugEnabled()) { + // 第一次不打印 + if (latestIndex.get() != null) { + LOGGER.debug("consul index compare:new-" + index + " old-" + latestIndex.get()); + } + + } + + return true; + } + + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java index f8bd224..1b10730 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServiceHealthCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -36,41 +34,32 @@ public class ServiceHealthCache extends ConsulCache<List<ServiceHealth>> { * Keys will be a {@link HostAndPort} object made up of the service's address/port combo * * @param healthClient the {@link HealthClient} - * @param serviceName the name of the service - * @param passing include only passing services? + * @param serviceName the name of the service + * @param passing include only passing services? * @return a cache object */ - public static ServiceHealthCache newCache( - final HealthClient healthClient, - final String serviceName, - final boolean passing, - final CatalogOptions catalogOptions, - final int watchSeconds, - final QueryOptions queryOptions) { + public static ServiceHealthCache newCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { CallbackConsumer<List<ServiceHealth>> callbackConsumer = new CallbackConsumer<List<ServiceHealth>>() { - @Override - public void consume(BigInteger index, - ConsulResponseCallback<List<ServiceHealth>> callback) { - // TODO Auto-generated method stub + @Override + public void consume(BigInteger index, ConsulResponseCallback<List<ServiceHealth>> callback) { + // TODO Auto-generated method stub QueryOptions params = watchParams(index, watchSeconds, queryOptions); if (passing) { healthClient.getHealthyServiceInstances(serviceName, catalogOptions, params, callback); } else { healthClient.getAllServiceInstances(serviceName, catalogOptions, params, callback); } - } + } }; return new ServiceHealthCache(callbackConsumer); } - public static ServiceHealthCache newCache( - final HealthClient healthClient, - final String serviceName, - final boolean passing, - final CatalogOptions catalogOptions, - final int watchSeconds) { + public static ServiceHealthCache newCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds) { return newCache(healthClient, serviceName, passing, catalogOptions, watchSeconds, QueryOptions.BLANK); } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java index e0961f0..ced3c2c 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/cache/ServicesCatalogCache.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.cache; @@ -25,29 +23,26 @@ import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; public class ServicesCatalogCache extends ConsulCache<HttpEntity> { - + private ServicesCatalogCache(CallbackConsumer<HttpEntity> callbackConsumer) { super(callbackConsumer); } - public static ServicesCatalogCache newCache( - final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) { - + public static ServicesCatalogCache newCache(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + CallbackConsumer<HttpEntity> callbackConsumer = new CallbackConsumer<HttpEntity>() { @Override public void consume(BigInteger index, ConsulResponseCallback<HttpEntity> callback) { - QueryOptions params = watchParams(index, watchSeconds, queryOptions); - catalogClient.getServices(catalogOptions, params,callback); + QueryOptions params = watchParams(index, watchSeconds, queryOptions); + catalogClient.getServices(catalogOptions, params, callback); } }; return new ServicesCatalogCache(callbackConsumer); } - + public static ServicesCatalogCache newCache(final CatalogClient catalogClient) { return newCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, 10); } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java index 5730b4b..49246cf 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckServiceDataEmptyAndAutoStopWatchFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -33,81 +31,69 @@ import com.orbitz.consul.model.health.ImmutableNode; -public class CheckServiceDataEmptyAndAutoStopWatchFilter implements - WatchTask.Filter<List<ServiceHealth>> { - - private final static Logger LOGGER = LoggerFactory - .getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class); - private final String serviceName; - - public CheckServiceDataEmptyAndAutoStopWatchFilter( - final String serviceName) { - this.serviceName = serviceName; - } - - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub - boolean result = check(object); - - if (!result) { - // create delete - writeServiceToQueue4Del(); - // stop watch - SyncDataManager.stopWatchService(serviceName); - } - - return result; - } - - // when: - // 1)service had been deleted - // 2)service Health check was not passing - // single service return [],size==0 - // stop this service watching task and create delete event - private boolean check(ConsulResponse<List<ServiceHealth>> object) { - boolean result = true; - - if (object == null || object.getResponse() == null - || object.getResponse().size() == 0) { - LOGGER.info("check service-{},its data is empty", - serviceName); - return false; - } - - return result; - } - - private void writeServiceToQueue4Del() { - ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); - data.setDataType(ServiceData.DataType.service); - data.setOperate(ServiceData.Operate.delete); - - // tell the subsequent operation the service name which will be deleted - Service service = ImmutableService.builder().id("").port(0).address("") - .service(serviceName).addTags("").createIndex(0).modifyIndex(0).build(); - ServiceHealth serviceHealth = ImmutableServiceHealth.builder() - .service(service) - .node(ImmutableNode.builder().node("").address("").build()) - .build(); - List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>(); - serviceHealthList.add(serviceHealth); - - data.setData(serviceHealthList); - - LOGGER.info("put delete service[" - + serviceName - + "] to service queue :because of deleted "); - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.warn( - "put delete service[" - + serviceName - + "] to service queue interrupted because of deleted:", - e); - } - } +public class CheckServiceDataEmptyAndAutoStopWatchFilter implements WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory.getLogger(CheckServiceDataEmptyAndAutoStopWatchFilter.class); + private final String serviceName; + + public CheckServiceDataEmptyAndAutoStopWatchFilter(final String serviceName) { + this.serviceName = serviceName; + } + + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + boolean result = check(object); + + if (!result) { + // create delete + writeServiceToQueue4Del(); + // stop watch + SyncDataManager.stopWatchService(serviceName); + } + + return result; + } + + // when: + // 1)service had been deleted + // 2)service Health check was not passing + // single service return [],size==0 + // stop this service watching task and create delete event + private boolean check(ConsulResponse<List<ServiceHealth>> object) { + boolean result = true; + + if (object == null || object.getResponse() == null || object.getResponse().size() == 0) { + LOGGER.info("check service-{},its data is empty", serviceName); + return false; + } + + return result; + } + + private void writeServiceToQueue4Del() { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setDataType(ServiceData.DataType.service); + data.setOperate(ServiceData.Operate.delete); + + // tell the subsequent operation the service name which will be deleted + Service service = ImmutableService.builder().id("").port(0).address("").service(serviceName).addTags("") + .createIndex(0).modifyIndex(0).build(); + ServiceHealth serviceHealth = ImmutableServiceHealth.builder().service(service) + .node(ImmutableNode.builder().node("").address("").build()).build(); + List<ServiceHealth> serviceHealthList = new ArrayList<ServiceHealth>(); + serviceHealthList.add(serviceHealth); + + data.setData(serviceHealthList); + + LOGGER.info("put delete service[" + serviceName + "] to service queue :because of deleted "); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn("put delete service[" + serviceName + "] to service queue interrupted because of deleted:", + e); + } + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java index 6dfc86a..49f3aa4 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/CheckTagAndAutoStopWatchFilter.java @@ -1,24 +1,21 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; import java.util.ArrayList; import java.util.List; -import org.onap.msb.apiroute.SyncDataManager; import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; import org.onap.msb.apiroute.wrapper.queue.QueueManager; import org.onap.msb.apiroute.wrapper.queue.ServiceData; @@ -29,83 +26,74 @@ import org.slf4j.LoggerFactory; import com.orbitz.consul.model.ConsulResponse; -public class CheckTagAndAutoStopWatchFilter implements - WatchTask.Filter<List<ServiceHealth>> { - - private final static Logger LOGGER = LoggerFactory - .getLogger(CheckTagAndAutoStopWatchFilter.class); - - private final String serviceName; - - public CheckTagAndAutoStopWatchFilter(final String serviceName) { - this.serviceName = serviceName; - } - - // from consul,the response data:List<ServiceHealth> - // filter ServiceHealth list and find the ServiceHealths which satisfy the - // tags conditions - // 1)if all ServiceHealth don't satisfy,create delete event and stop watch - // 2)if have some ServiceHealths satisfy the tags conditions,create update - // event and send these ServiceHealths - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub - - // find #ServiceHealth# which satisfy the tag conditions - List<ServiceHealth> satisfyList = getSatisfyList(object); - - // no satisfied ServiceHealth - if (satisfyList.isEmpty()) { - - LOGGER.info("put delete service[" - + serviceName - + "] to service queue :because of NO tag meet the conditions"); - - // create delete - writeServiceToQueue(object.getResponse(), - ServiceData.Operate.delete); - // stop watch - //SyncDataManager.stopWatchService(serviceName); - return false; - } - - LOGGER.info("put update service[" - + serviceName - + "] to service queue :which tags meet the conditions"); - - // put the satisfy list to queue - writeServiceToQueue(satisfyList, ServiceData.Operate.update); - - return true; - } - - private List<ServiceHealth> getSatisfyList( - ConsulResponse<List<ServiceHealth>> object) { - List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>(); - for (ServiceHealth health : object.getResponse()) { - - if (ServiceFilter.getInstance().isFilterCheck(health)) { - satisfyList.add(health); - } - } - - return satisfyList; - } - - private void writeServiceToQueue(List<ServiceHealth> serviceData, - Operate operate) { - ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); - data.setOperate(operate); - data.setDataType(ServiceData.DataType.service); - data.setData(serviceData); - - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - LOGGER.warn("put " + operate + " service[" + serviceName - + "] to service queue interrupted ", e); - } - - } +public class CheckTagAndAutoStopWatchFilter implements WatchTask.Filter<List<ServiceHealth>> { + + private final static Logger LOGGER = LoggerFactory.getLogger(CheckTagAndAutoStopWatchFilter.class); + + private final String serviceName; + + public CheckTagAndAutoStopWatchFilter(final String serviceName) { + this.serviceName = serviceName; + } + + // from consul,the response data:List<ServiceHealth> + // filter ServiceHealth list and find the ServiceHealths which satisfy the + // tags conditions + // 1)if all ServiceHealth don't satisfy,create delete event and stop watch + // 2)if have some ServiceHealths satisfy the tags conditions,create update + // event and send these ServiceHealths + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub + + // find #ServiceHealth# which satisfy the tag conditions + List<ServiceHealth> satisfyList = getSatisfyList(object); + + // no satisfied ServiceHealth + if (satisfyList.isEmpty()) { + + LOGGER.info("put delete service[" + serviceName + + "] to service queue :because of NO tag meet the conditions"); + + // create delete + writeServiceToQueue(object.getResponse(), ServiceData.Operate.delete); + // stop watch + // SyncDataManager.stopWatchService(serviceName); + return false; + } + + LOGGER.info("put update service[" + serviceName + "] to service queue :which tags meet the conditions"); + + // put the satisfy list to queue + writeServiceToQueue(satisfyList, ServiceData.Operate.update); + + return true; + } + + private List<ServiceHealth> getSatisfyList(ConsulResponse<List<ServiceHealth>> object) { + List<ServiceHealth> satisfyList = new ArrayList<ServiceHealth>(); + for (ServiceHealth health : object.getResponse()) { + + if (ServiceFilter.getInstance().isFilterCheck(health)) { + satisfyList.add(health); + } + } + + return satisfyList; + } + + private void writeServiceToQueue(List<ServiceHealth> serviceData, Operate operate) { + ServiceData<List<ServiceHealth>> data = new ServiceData<List<ServiceHealth>>(); + data.setOperate(operate); + data.setDataType(ServiceData.DataType.service); + data.setData(serviceData); + + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + LOGGER.warn("put " + operate + " service[" + serviceName + "] to service queue interrupted ", e); + } + + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java index 08c27a7..ccba7c9 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ConsulIndexFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -25,39 +23,36 @@ import com.orbitz.consul.model.ConsulResponse; public class ConsulIndexFilter<T> implements WatchTask.Filter<T> { - private final static Logger LOGGER = LoggerFactory - .getLogger(ConsulIndexFilter.class); - - private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>( - null); - - @Override - public boolean filter(final ConsulResponse<T> object) { - // TODO Auto-generated method stub - return isChanged(object); - } - - private boolean isChanged(final ConsulResponse<T> consulResponse) { - - if (consulResponse != null && consulResponse.getIndex() != null - && !consulResponse.getIndex().equals(latestIndex.get())) { - - if(LOGGER.isDebugEnabled()){ - //第一次不打印 - if (latestIndex.get()!=null) { - LOGGER.debug("consul index compare:new-" - + consulResponse.getIndex() + " old-" - + latestIndex.get()); - } - - } - - this.latestIndex.set(consulResponse.getIndex()); - return true; - } - - return false; - } - - + private final static Logger LOGGER = LoggerFactory.getLogger(ConsulIndexFilter.class); + + private final AtomicReference<BigInteger> latestIndex = new AtomicReference<BigInteger>(null); + + @Override + public boolean filter(final ConsulResponse<T> object) { + // TODO Auto-generated method stub + return isChanged(object); + } + + private boolean isChanged(final ConsulResponse<T> consulResponse) { + + if (consulResponse != null && consulResponse.getIndex() != null + && !consulResponse.getIndex().equals(latestIndex.get())) { + + if (LOGGER.isDebugEnabled()) { + // 第一次不打印 + if (latestIndex.get() != null) { + LOGGER.debug("consul index compare:new-" + consulResponse.getIndex() + " old-" + + latestIndex.get()); + } + + } + + this.latestIndex.set(consulResponse.getIndex()); + return true; + } + + return false; + } + + } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java index 6f90b80..6694b68 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/ServiceModifyIndexFilter.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -30,107 +28,104 @@ import com.orbitz.consul.model.health.HealthCheck; public class ServiceModifyIndexFilter implements WatchTask.Filter<List<ServiceHealth>> { - private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse = - new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of()); + private final AtomicReference<ImmutableList<ServiceHealth>> lastResponse = + new AtomicReference<ImmutableList<ServiceHealth>>(ImmutableList.<ServiceHealth>of()); - private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class); + private final static Logger LOGGER = LoggerFactory.getLogger(ServiceModifyIndexFilter.class); - @Override - public boolean filter(ConsulResponse<List<ServiceHealth>> object) { - // TODO Auto-generated method stub + @Override + public boolean filter(ConsulResponse<List<ServiceHealth>> object) { + // TODO Auto-generated method stub - List<ServiceHealth> newList=object.getResponse(); - if(realFilter(newList)){ - lastResponse.set(ImmutableList.copyOf(newList)); - return true; - } - - return false; - } - - private boolean realFilter(List<ServiceHealth> newList) { - // 1)判断list的size,不等则改变 - if (newList.size() != lastResponse.get().size()) { - // 第一次不打印 - if (lastResponse.get().size() != 0) { - LOGGER.info(newList.get(0).getService().getService() - + " instance count is different.new_count:" + newList.size() + " old_count:" - + lastResponse.get().size()); - } - - return true; + List<ServiceHealth> newList = object.getResponse(); + if (realFilter(newList)) { + lastResponse.set(ImmutableList.copyOf(newList)); + return true; + } + + return false; } - - - // 2)循环服务实例判断服务内容和健康检查是否改变 - for (ServiceHealth newData : newList) { - ServiceHealth sameIdOldData = findSameIdInOldList(newData); - // 若在oldlist中不存在,则改变 - if (sameIdOldData == null) { - - LOGGER.info(newData.getService().getId() - + " is a new service instance.the createindex:" - + newData.getService().getCreateIndex() - + " the modifyIndex:" - + newData.getService().getModifyIndex()); - - return true; - } - - // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变 - if(!compareService(newData,sameIdOldData)){ - LOGGER.info(newData.getService().getId() +" instance is change because of modifyIndex or health check" ); - return true; - } + + private boolean realFilter(List<ServiceHealth> newList) { + // 1)判断list的size,不等则改变 + if (newList.size() != lastResponse.get().size()) { + // 第一次不打印 + if (lastResponse.get().size() != 0) { + LOGGER.info(newList.get(0).getService().getService() + " instance count is different.new_count:" + + newList.size() + " old_count:" + lastResponse.get().size()); + } + + return true; + } + + + // 2)循环服务实例判断服务内容和健康检查是否改变 + for (ServiceHealth newData : newList) { + ServiceHealth sameIdOldData = findSameIdInOldList(newData); + // 若在oldlist中不存在,则改变 + if (sameIdOldData == null) { + + LOGGER.info(newData.getService().getId() + " is a new service instance.the createindex:" + + newData.getService().getCreateIndex() + " the modifyIndex:" + + newData.getService().getModifyIndex()); + + return true; + } + + // 若在oldlist中存在,则比较ModifyIndex的值和健康检查状态.不等则改变 + if (!compareService(newData, sameIdOldData)) { + LOGGER.info(newData.getService().getId() + + " instance is change because of modifyIndex or health check"); + return true; + } + } + + return false; + + } - - return false; - } + private boolean compareService(ServiceHealth oldData, ServiceHealth newData) { + + return compareServiceInfo(oldData.getService(), newData.getService()) + && compareServiceHealthStatus(oldData.getChecks(), newData.getChecks()); + } - private boolean compareService(ServiceHealth oldData,ServiceHealth newData) { - - return compareServiceInfo(oldData.getService(),newData.getService()) && - compareServiceHealthStatus(oldData.getChecks(),newData.getChecks()); - - } - - private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) { - if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) { - LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:" - + newServiceInfo.getModifyIndex() + " old_modifyIndex:" - + oldServiceInfo.getModifyIndex()); - return false; + private boolean compareServiceInfo(Service oldServiceInfo, Service newServiceInfo) { + if (oldServiceInfo.getModifyIndex() != newServiceInfo.getModifyIndex()) { + LOGGER.info(newServiceInfo.getId() + " new_modifyIndex:" + newServiceInfo.getModifyIndex() + + " old_modifyIndex:" + oldServiceInfo.getModifyIndex()); + return false; + } + return true; } - return true; - } - - private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) { - boolean oldHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(oldData); - boolean newHealthCheck=ServiceFilter.getInstance().isFilterHealthCheck(newData); - return oldHealthCheck==newHealthCheck; - - } - - - private ServiceHealth findSameIdInOldList(ServiceHealth newData) { - for (ServiceHealth oldData : lastResponse.get()) { - if (oldData.getService().getId().equals(newData.getService().getId())) { - return oldData; - } + + private boolean compareServiceHealthStatus(List<HealthCheck> oldData, List<HealthCheck> newData) { + boolean oldHealthCheck = ServiceFilter.getInstance().isFilterHealthCheck(oldData); + boolean newHealthCheck = ServiceFilter.getInstance().isFilterHealthCheck(newData); + return oldHealthCheck == newHealthCheck; + } - return null; - } - public boolean resetModifyIndex() { - // clear last response - lastResponse.set(ImmutableList.<ServiceHealth>of()); - return true; - } + private ServiceHealth findSameIdInOldList(ServiceHealth newData) { + for (ServiceHealth oldData : lastResponse.get()) { + if (oldData.getService().getId().equals(newData.getService().getId())) { + return oldData; + } + } + + return null; + } + + public boolean resetModifyIndex() { + // clear last response + lastResponse.set(ImmutableList.<ServiceHealth>of()); + return true; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java index 678bb87..5cf4017 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchCatalogServicesTask.java @@ -1,105 +1,88 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; import org.apache.http.HttpEntity; import org.onap.msb.apiroute.wrapper.consulextend.CatalogClient; -import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache; import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServicesCatalogCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; -public class WatchCatalogServicesTask extends WatchTask<HttpEntity> { +public class WatchCatalogServicesTask extends WatchTask<HttpEntity> { + + private final static Logger LOGGER = LoggerFactory.getLogger(WatchCatalogServicesTask.class); + + private ServicesCatalogCache servicesCache = null; + + public WatchCatalogServicesTask(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + initCache(catalogClient, catalogOptions, queryOptions, watchSeconds); + } + + public WatchCatalogServicesTask(final CatalogClient catalogClient, final int watchSeconds) { + initCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, watchSeconds); + } + + public WatchCatalogServicesTask(final CatalogClient catalogClient) { + initCache(catalogClient, CatalogOptions.BLANK, QueryOptions.BLANK, 10); + } + + private ServicesCatalogCache initCache(final CatalogClient catalogClient, final CatalogOptions catalogOptions, + final QueryOptions queryOptions, final int watchSeconds) { + LOGGER.info("************create all services watch task*****************"); + servicesCache = ServicesCatalogCache.newCache(catalogClient, catalogOptions, queryOptions, watchSeconds); + + servicesCache.addListener((Listener<HttpEntity>) new InternalListener()); - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchCatalogServicesTask.class); - - private ServicesCatalogCache servicesCache = null; - - public WatchCatalogServicesTask( - final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) - { - initCache(catalogClient,catalogOptions,queryOptions,watchSeconds); - } - - public WatchCatalogServicesTask( - final CatalogClient catalogClient, - final int watchSeconds) - { - initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,watchSeconds); - } - - public WatchCatalogServicesTask( - final CatalogClient catalogClient) - { - initCache(catalogClient,CatalogOptions.BLANK,QueryOptions.BLANK,10); - } - - private ServicesCatalogCache initCache(final CatalogClient catalogClient, - final CatalogOptions catalogOptions, - final QueryOptions queryOptions, - final int watchSeconds) { - LOGGER.info("************create all services watch task*****************"); - servicesCache = ServicesCatalogCache.newCache(catalogClient, - catalogOptions, queryOptions, watchSeconds); + return servicesCache; + } - servicesCache - .addListener((Listener<HttpEntity>) new InternalListener()); + @Override + public boolean startWatch() { + // TODO Auto-generated method stub + if (servicesCache != null) { + try { + servicesCache.start(); + LOGGER.info("************start all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service list watch failed:", e); + } + } - return servicesCache; - } - - @Override - public boolean startWatch() { - // TODO Auto-generated method stub - if(servicesCache!=null) - { - try { - servicesCache.start(); - LOGGER.info("************start all services watch task*****************"); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("start service list watch failed:", e); - } - } - - return false; - } + return false; + } - @Override - public boolean stopWatch() { - // TODO Auto-generated method stub - if (servicesCache != null) { - try { - servicesCache.stop(); - LOGGER.info("************stop all services watch task*****************"); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("stop service list watch failed:", e); - } - } - return false; - } + @Override + public boolean stopWatch() { + // TODO Auto-generated method stub + if (servicesCache != null) { + try { + servicesCache.stop(); + LOGGER.info("************stop all services watch task*****************"); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service list watch failed:", e); + } + } + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java index 73a5176..9fad93d 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchServiceHealthTask.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -19,8 +17,8 @@ import java.math.BigInteger; import java.util.List; import org.onap.msb.apiroute.wrapper.consulextend.HealthClient; -import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache; import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache.Listener; +import org.onap.msb.apiroute.wrapper.consulextend.cache.ServiceHealthCache; import org.onap.msb.apiroute.wrapper.consulextend.model.health.ServiceHealth; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,115 +27,102 @@ import com.orbitz.consul.option.CatalogOptions; import com.orbitz.consul.option.QueryOptions; public class WatchServiceHealthTask extends WatchTask<List<ServiceHealth>> { - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchServiceHealthTask.class); - - private ServiceHealthCache serviceHealthCache = null; - private String serviceName=""; - - public String getServiceName() { - return serviceName; - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName,final boolean passing, - final CatalogOptions catalogOptions, final int watchSeconds, - final QueryOptions queryOptions) { - initCache(healthClient, serviceName, passing, catalogOptions, - watchSeconds, queryOptions); - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName,final boolean passing, - final int watchSeconds) - - { - initCache(healthClient, serviceName, passing, CatalogOptions.BLANK, - watchSeconds, QueryOptions.BLANK); - } - - public WatchServiceHealthTask(final HealthClient healthClient, - final String serviceName, final int watchSeconds) - - { - initCache(healthClient, serviceName, true, CatalogOptions.BLANK, - watchSeconds, QueryOptions.BLANK); - } - - private ServiceHealthCache initCache(final HealthClient healthClient, - final String serviceName,final boolean passing, - final CatalogOptions catalogOptions, final int watchSeconds, - final QueryOptions queryOptions) { -// LOGGER.info("************create {} watch task*****************",serviceName); - this.serviceName = serviceName; - serviceHealthCache = ServiceHealthCache.newCache(healthClient, - serviceName, passing, catalogOptions, watchSeconds, - queryOptions); - - serviceHealthCache - .addListener((Listener<List<ServiceHealth>>) new InternalListener()); - - return serviceHealthCache; - } - - public boolean startWatch() { - - if(serviceHealthCache!=null) - { - try { - serviceHealthCache.start(); - LOGGER.info("************start {} watch task*****************",serviceName); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("start service watch failed:", e); - } - } - - return false; - - } - - public boolean stopWatch(){ - if (serviceHealthCache != null) { - try { - serviceHealthCache.stop(); - LOGGER.info("************stop {} watch task*****************",serviceName); - return true; - } catch (Exception e) { - // TODO Auto-generated catch block - LOGGER.warn("stop service watch failed:", e); - } - } - - return false; - } - - - public boolean resetIndex() - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset " + serviceName + " consul index"); - } - - //reset consul index - serviceHealthCache.updateIndex(BigInteger.valueOf(0)); - - - //reset modify index - for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) { - if (filter instanceof ServiceModifyIndexFilter) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset " + serviceName + " modify index"); - } - return ((ServiceModifyIndexFilter) filter).resetModifyIndex(); - } - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("reset modify index.did not find filter:" + serviceName); - } - - return false; - } + private final static Logger LOGGER = LoggerFactory.getLogger(WatchServiceHealthTask.class); + + private ServiceHealthCache serviceHealthCache = null; + private String serviceName = ""; + + public String getServiceName() { + return serviceName; + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final boolean passing, + final CatalogOptions catalogOptions, final int watchSeconds, final QueryOptions queryOptions) { + initCache(healthClient, serviceName, passing, catalogOptions, watchSeconds, queryOptions); + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final boolean passing, + final int watchSeconds) + + { + initCache(healthClient, serviceName, passing, CatalogOptions.BLANK, watchSeconds, QueryOptions.BLANK); + } + + public WatchServiceHealthTask(final HealthClient healthClient, final String serviceName, final int watchSeconds) + + { + initCache(healthClient, serviceName, true, CatalogOptions.BLANK, watchSeconds, QueryOptions.BLANK); + } + + private ServiceHealthCache initCache(final HealthClient healthClient, final String serviceName, + final boolean passing, final CatalogOptions catalogOptions, final int watchSeconds, + final QueryOptions queryOptions) { + // LOGGER.info("************create {} watch task*****************",serviceName); + this.serviceName = serviceName; + serviceHealthCache = ServiceHealthCache.newCache(healthClient, serviceName, passing, catalogOptions, + watchSeconds, queryOptions); + + serviceHealthCache.addListener((Listener<List<ServiceHealth>>) new InternalListener()); + + return serviceHealthCache; + } + + public boolean startWatch() { + + if (serviceHealthCache != null) { + try { + serviceHealthCache.start(); + LOGGER.info("************start {} watch task*****************", serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("start service watch failed:", e); + } + } + + return false; + + } + + public boolean stopWatch() { + if (serviceHealthCache != null) { + try { + serviceHealthCache.stop(); + LOGGER.info("************stop {} watch task*****************", serviceName); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOGGER.warn("stop service watch failed:", e); + } + } + + return false; + } + + + public boolean resetIndex() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " consul index"); + } + + // reset consul index + serviceHealthCache.updateIndex(BigInteger.valueOf(0)); + + + // reset modify index + for (WatchTask.Filter<List<ServiceHealth>> filter : getAllFilters()) { + if (filter instanceof ServiceModifyIndexFilter) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset " + serviceName + " modify index"); + } + return ((ServiceModifyIndexFilter) filter).resetModifyIndex(); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("reset modify index.did not find filter:" + serviceName); + } + + return false; + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java index f565335..f12f95f 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WatchTask.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -24,79 +22,77 @@ import org.slf4j.LoggerFactory; import com.orbitz.consul.model.ConsulResponse; public abstract class WatchTask<T> { - private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>(); - private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>(); - private final static Logger LOGGER = LoggerFactory - .getLogger(WatchTask.class); - - //start - public abstract boolean startWatch(); - - //stop - public abstract boolean stopWatch(); - - // filters - public interface Filter<T> { - public boolean filter(final ConsulResponse<T> object); - } - - public boolean addFilter(Filter<T> filter) { - boolean added = filters.add(filter); - return added; - } - - public void removeAllFilter() { - filters.clear(); - } - - - public final CopyOnWriteArrayList<Filter<T>> getAllFilters(){ - return filters; - } - - // handlers - public interface Handler<T> { - void handle(final ConsulResponse<T> object); - } - - public boolean addHandler(Handler<T> handler) { - boolean added = handlers.add(handler); - return added; - } - - public void removeAllHandler() { - handlers.clear(); - } - - // internal listener - protected class InternalListener implements ConsulCache.Listener<T> { - @Override - public void notify(ConsulResponse<T> newValues) { - - long startTime = System.currentTimeMillis(); - - // filter - for (Filter<T> f : filters) { - // false,return - if (!f.filter(newValues)) { - return; - } - } - - // handle - for (Handler<T> h : handlers) { - h.handle(newValues); - } - - long endTime = System.currentTimeMillis(); - - if(endTime-startTime > 10*1000) - { - LOGGER.info("WatchTask THEAD WORK TIMEOUT"); - } - } - - } + private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>(); + private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>(); + private final static Logger LOGGER = LoggerFactory.getLogger(WatchTask.class); + + // start + public abstract boolean startWatch(); + + // stop + public abstract boolean stopWatch(); + + // filters + public interface Filter<T> { + public boolean filter(final ConsulResponse<T> object); + } + + public boolean addFilter(Filter<T> filter) { + boolean added = filters.add(filter); + return added; + } + + public void removeAllFilter() { + filters.clear(); + } + + + public final CopyOnWriteArrayList<Filter<T>> getAllFilters() { + return filters; + } + + // handlers + public interface Handler<T> { + void handle(final ConsulResponse<T> object); + } + + public boolean addHandler(Handler<T> handler) { + boolean added = handlers.add(handler); + return added; + } + + public void removeAllHandler() { + handlers.clear(); + } + + // internal listener + protected class InternalListener implements ConsulCache.Listener<T> { + @Override + public void notify(ConsulResponse<T> newValues) { + + long startTime = System.currentTimeMillis(); + + // filter + for (Filter<T> f : filters) { + // false,return + if (!f.filter(newValues)) { + return; + } + } + + // handle + for (Handler<T> h : handlers) { + h.handle(newValues); + } + + long endTime = System.currentTimeMillis(); + + if (endTime - startTime > 10 * 1000) { + LOGGER.info("WatchTask THEAD WORK TIMEOUT"); + } + } + + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java index c4df452..517003a 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/expose/WriteBufferHandler.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.expose; @@ -24,28 +22,27 @@ import com.orbitz.consul.model.ConsulResponse; public class WriteBufferHandler<T> implements WatchTask.Handler<T> { - private static final Logger LOGGER = LoggerFactory - .getLogger(WriteBufferHandler.class); - private final ServiceData.DataType dataType; - - - public WriteBufferHandler(final ServiceData.DataType dataType) { - this.dataType =dataType; - } - - @Override - public void handle(ConsulResponse<T> object) { - // TODO Auto-generated method stub - ServiceData<T> data = new ServiceData<T>(); - data.setDataType(dataType); - data.setData(object.getResponse()); - - try { - QueueManager.getInstance().putIn(data); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - LOGGER.warn("put data to buffer interrupted:", e); - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(WriteBufferHandler.class); + private final ServiceData.DataType dataType; + + + public WriteBufferHandler(final ServiceData.DataType dataType) { + this.dataType = dataType; + } + + @Override + public void handle(ConsulResponse<T> object) { + // TODO Auto-generated method stub + ServiceData<T> data = new ServiceData<T>(); + data.setDataType(dataType); + data.setData(object.getResponse()); + + try { + QueueManager.getInstance().putIn(data); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + LOGGER.warn("put data to buffer interrupted:", e); + } + } } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/Service.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/Service.java index cd85955..4d33941 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/Service.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/Service.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.model.health; @@ -30,25 +28,25 @@ import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) public abstract class Service { - @JsonProperty("ID") - public abstract String getId(); + @JsonProperty("ID") + public abstract String getId(); - @JsonProperty("Service") - public abstract String getService(); + @JsonProperty("Service") + public abstract String getService(); - @JsonProperty("Tags") - @JsonDeserialize(as = ImmutableList.class, contentAs = String.class) - public abstract List<String> getTags(); + @JsonProperty("Tags") + @JsonDeserialize(as = ImmutableList.class, contentAs = String.class) + public abstract List<String> getTags(); - @JsonProperty("Address") - public abstract String getAddress(); + @JsonProperty("Address") + public abstract String getAddress(); - @JsonProperty("Port") - public abstract int getPort(); + @JsonProperty("Port") + public abstract int getPort(); - @JsonProperty("CreateIndex") - public abstract int getCreateIndex(); + @JsonProperty("CreateIndex") + public abstract int getCreateIndex(); - @JsonProperty("ModifyIndex") - public abstract int getModifyIndex(); + @JsonProperty("ModifyIndex") + public abstract int getModifyIndex(); } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/ServiceHealth.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/ServiceHealth.java index 007836a..1819bd7 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/ServiceHealth.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/model/health/ServiceHealth.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.model.health; @@ -26,6 +24,7 @@ import com.orbitz.consul.model.health.Node; import org.immutables.value.Value; import java.util.List; + @Value.Immutable @JsonSerialize(as = ImmutableServiceHealth.class) @JsonDeserialize(as = ImmutableServiceHealth.class) @@ -41,5 +40,5 @@ public abstract class ServiceHealth { @JsonProperty("Checks") @JsonDeserialize(as = ImmutableList.class, contentAs = HealthCheck.class) public abstract List<HealthCheck> getChecks(); - + } diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java index f6b9d6c..ce2dc7c 100644 --- a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java @@ -1,17 +1,15 @@ /******************************************************************************* * Copyright 2016-2017 ZTE, Inc. and others. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. ******************************************************************************/ package org.onap.msb.apiroute.wrapper.consulextend.util; @@ -47,250 +45,215 @@ import com.orbitz.consul.option.QueryOptions; import com.orbitz.consul.util.Jackson; public class Http { - private static final Logger LOGGER = LoggerFactory.getLogger(Http.class); - - private final static CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients - .custom().setMaxConnTotal(Integer.MAX_VALUE) - .setMaxConnPerRoute(Integer.MAX_VALUE).build(); - - private static Http instance = null; - - private Http() { - } - - public static Http getInstance() { - if (instance == null) { - instance = new Http(); - httpAsyncClient.start(); - } - - return instance; - } - - // async get data from consul,and handle response immediately - public <T> void asyncGet(String requestURI, - final TypeReference<T> responseType, - final ConsulResponseCallback<T> callback, final Integer... okCodes) { - // LOGGER.info("Async request:"+requestURI); - - httpAsyncClient.execute(new HttpGet(requestURI), - new FutureCallback<HttpResponse>() { - - public void completed(final HttpResponse response) { - callback.onComplete(consulResponse(responseType, - response)); - } - - public void failed(final Exception ex) { - callback.onFailure(ex); - } - - public void cancelled() { - LOGGER.warn("cancelled async request"); - } - }); - } - - // async get data from consul,and handle response delay - public <T> void asyncGetDelayHandle(String requestURI, - final TypeReference<T> responseType, - final ConsulResponseCallback<T> callback, final Integer... okCodes) { - - httpAsyncClient.execute(new HttpGet(requestURI), - new FutureCallback<HttpResponse>() { - - public void completed(final HttpResponse response) { - OriginalConsulResponse<T> originalConsulResponse = new OriginalConsulResponse<T>( - response, responseType); - - //handle not 2xx code - if (!isSuccessful(response)) { - - LOGGER.warn("response statuscode:" - + response.getStatusLine().getStatusCode()); - - callback.onFailure(new ConsulException( - "response statuscode:" - + response.getStatusLine() - .getStatusCode())); - } else { - callback.onDelayComplete(originalConsulResponse); - } - - } - - public void failed(final Exception ex) { - callback.onFailure(ex); - } - - public void cancelled() { - LOGGER.warn("cancelled async request"); - } - }); - } - - public static ConsulResponseHeader consulResponseHeader( - HttpResponse response) { - String indexHeaderValue = response.getFirstHeader("X-Consul-Index") - .getValue(); - String lastContactHeaderValue = response.getFirstHeader( - "X-Consul-Lastcontact").getValue(); - String knownLeaderHeaderValue = response.getFirstHeader( - "X-Consul-Knownleader").getValue(); - - BigInteger index = indexHeaderValue == null ? new BigInteger("0") - : new BigInteger(indexHeaderValue); - long lastContact = lastContactHeaderValue == null ? 0 : Long - .parseLong(lastContactHeaderValue); - boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean - .parseBoolean(knownLeaderHeaderValue); - - return new ConsulResponseHeader(lastContact, knownLeader, index); - } - - public static <T> ConsulResponse<T> consulResponse( - TypeReference<T> responseType, HttpResponse response) { - - String indexHeaderValue = response.getFirstHeader("X-Consul-Index") - .getValue(); - String lastContactHeaderValue = response.getFirstHeader( - "X-Consul-Lastcontact").getValue(); - String knownLeaderHeaderValue = response.getFirstHeader( - "X-Consul-Knownleader").getValue(); - - BigInteger index = indexHeaderValue == null ? new BigInteger("0") - : new BigInteger(indexHeaderValue); - long lastContact = lastContactHeaderValue == null ? 0 : Long - .parseLong(lastContactHeaderValue); - boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean - .parseBoolean(knownLeaderHeaderValue); - - ConsulResponse<T> consulResponse = new ConsulResponse<T>(readResponse( - response, responseType), lastContact, knownLeader, index); - return consulResponse; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public static <T> T readResponse(HttpResponse response, - TypeReference<T> responseType) { - - // read streamed entity - T object; - - // HttpEntity,read original data. - Type _type = responseType.getType(); - if (_type instanceof Class - && (((Class) _type).isAssignableFrom(HttpEntity.class))) { - object = (T) response.getEntity(); - return object; - } - - // String,read original data. - if (_type instanceof Class - && (((Class) _type).isAssignableFrom(String.class))) { - - try { - - object = (T) IOUtils - .toString(response.getEntity().getContent()); - response.getEntity().getContent().close(); - - } catch (UnsupportedOperationException e) { - object = (T) ""; - LOGGER.warn("covert streamed entity to String exception:", e); - } catch (IOException e) { - object = (T) ""; - LOGGER.warn("covert streamed entity to String exception:", e); - } - - return object; - } - - // change data type - try { - object = Jackson.MAPPER.readValue( - response.getEntity().getContent(), responseType); - } catch (IOException e) { - LOGGER.warn("covert streamed entity to object exception:", e); - object = readDefaultResponse(responseType); - } - - return object; - } - - @SuppressWarnings("unchecked") - public static <T> T readDefaultResponse(TypeReference<T> responseType) { - Type _type = responseType.getType(); - if (_type instanceof ParameterizedType - && ((ParameterizedType) _type).getRawType() == List.class) { - return (T) ImmutableList.of(); - } else if (_type instanceof ParameterizedType - && ((ParameterizedType) _type).getRawType() == Map.class) { - return (T) ImmutableMap.of(); - } else { - // Not sure if this case will be reached, but if it is it'll be nice - // to know - throw new IllegalStateException( - "Cannot determine empty representation for " + _type); - } - } - - public static boolean isSuccessful(HttpResponse response, - Integer... okCodes) { - return HttpStatus.isSuccess(response.getStatusLine().getStatusCode()) - || Sets.newHashSet(okCodes).contains( - response.getStatusLine().getStatusCode()); - } - - public static String optionsFrom(CatalogOptions catalogOptions, - QueryOptions queryOptions) { - String params = ""; - - if (catalogOptions != null) { - Map<String, Object> options = catalogOptions.toQuery(); - - if (options.containsKey("dc")) { - params += "dc=" + options.get("dc"); - } - if (options.containsKey("tag")) { - params += params.isEmpty() ? "" : "&"; - params += "tag=" + options.get("tag"); - } - } - - if (queryOptions != null) { - Map<String, Object> options = queryOptions.toQuery(); - - if (options.containsKey("consistent")) { - params += params.isEmpty() ? "" : "&"; - params += "consistent=" + options.get("consistent"); - } - if (options.containsKey("stale")) { - params += params.isEmpty() ? "" : "&"; - params += "stale=" + options.get("stale"); - } - if (options.containsKey("wait")) { - params += params.isEmpty() ? "" : "&"; - params += "wait=" + options.get("wait"); - } - - if (options.containsKey("index")) { - params += params.isEmpty() ? "" : "&"; - params += "index=" + options.get("index"); - } - if (options.containsKey("token")) { - params += params.isEmpty() ? "" : "&"; - params += "token=" + options.get("token"); - } - if (options.containsKey("near")) { - params += params.isEmpty() ? "" : "&"; - params += "near=" + options.get("near"); - } - if (options.containsKey("dc")) { - params += params.isEmpty() ? "" : "&"; - params += "dc=" + options.get("dc"); - } - } - return params; - } + private static final Logger LOGGER = LoggerFactory.getLogger(Http.class); + + private final static CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom() + .setMaxConnTotal(Integer.MAX_VALUE).setMaxConnPerRoute(Integer.MAX_VALUE).build(); + + private static Http instance = null; + + private Http() {} + + public static Http getInstance() { + if (instance == null) { + instance = new Http(); + httpAsyncClient.start(); + } + + return instance; + } + + // async get data from consul,and handle response immediately + public <T> void asyncGet(String requestURI, final TypeReference<T> responseType, + final ConsulResponseCallback<T> callback, final Integer... okCodes) { + // LOGGER.info("Async request:"+requestURI); + + httpAsyncClient.execute(new HttpGet(requestURI), new FutureCallback<HttpResponse>() { + + public void completed(final HttpResponse response) { + callback.onComplete(consulResponse(responseType, response)); + } + + public void failed(final Exception ex) { + callback.onFailure(ex); + } + + public void cancelled() { + LOGGER.warn("cancelled async request"); + } + }); + } + + // async get data from consul,and handle response delay + public <T> void asyncGetDelayHandle(String requestURI, final TypeReference<T> responseType, + final ConsulResponseCallback<T> callback, final Integer... okCodes) { + + httpAsyncClient.execute(new HttpGet(requestURI), new FutureCallback<HttpResponse>() { + + public void completed(final HttpResponse response) { + OriginalConsulResponse<T> originalConsulResponse = + new OriginalConsulResponse<T>(response, responseType); + + // handle not 2xx code + if (!isSuccessful(response)) { + + LOGGER.warn("response statuscode:" + response.getStatusLine().getStatusCode()); + + callback.onFailure(new ConsulException( + "response statuscode:" + response.getStatusLine().getStatusCode())); + } else { + callback.onDelayComplete(originalConsulResponse); + } + + } + + public void failed(final Exception ex) { + callback.onFailure(ex); + } + + public void cancelled() { + LOGGER.warn("cancelled async request"); + } + }); + } + + public static ConsulResponseHeader consulResponseHeader(HttpResponse response) { + String indexHeaderValue = response.getFirstHeader("X-Consul-Index").getValue(); + String lastContactHeaderValue = response.getFirstHeader("X-Consul-Lastcontact").getValue(); + String knownLeaderHeaderValue = response.getFirstHeader("X-Consul-Knownleader").getValue(); + + BigInteger index = indexHeaderValue == null ? new BigInteger("0") : new BigInteger(indexHeaderValue); + long lastContact = lastContactHeaderValue == null ? 0 : Long.parseLong(lastContactHeaderValue); + boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean.parseBoolean(knownLeaderHeaderValue); + + return new ConsulResponseHeader(lastContact, knownLeader, index); + } + + public static <T> ConsulResponse<T> consulResponse(TypeReference<T> responseType, HttpResponse response) { + + String indexHeaderValue = response.getFirstHeader("X-Consul-Index").getValue(); + String lastContactHeaderValue = response.getFirstHeader("X-Consul-Lastcontact").getValue(); + String knownLeaderHeaderValue = response.getFirstHeader("X-Consul-Knownleader").getValue(); + + BigInteger index = indexHeaderValue == null ? new BigInteger("0") : new BigInteger(indexHeaderValue); + long lastContact = lastContactHeaderValue == null ? 0 : Long.parseLong(lastContactHeaderValue); + boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean.parseBoolean(knownLeaderHeaderValue); + + ConsulResponse<T> consulResponse = + new ConsulResponse<T>(readResponse(response, responseType), lastContact, knownLeader, index); + return consulResponse; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <T> T readResponse(HttpResponse response, TypeReference<T> responseType) { + + // read streamed entity + T object; + + // HttpEntity,read original data. + Type _type = responseType.getType(); + if (_type instanceof Class && (((Class) _type).isAssignableFrom(HttpEntity.class))) { + object = (T) response.getEntity(); + return object; + } + + // String,read original data. + if (_type instanceof Class && (((Class) _type).isAssignableFrom(String.class))) { + + try { + + object = (T) IOUtils.toString(response.getEntity().getContent()); + response.getEntity().getContent().close(); + + } catch (UnsupportedOperationException e) { + object = (T) ""; + LOGGER.warn("covert streamed entity to String exception:", e); + } catch (IOException e) { + object = (T) ""; + LOGGER.warn("covert streamed entity to String exception:", e); + } + + return object; + } + + // change data type + try { + object = Jackson.MAPPER.readValue(response.getEntity().getContent(), responseType); + } catch (IOException e) { + LOGGER.warn("covert streamed entity to object exception:", e); + object = readDefaultResponse(responseType); + } + + return object; + } + + @SuppressWarnings("unchecked") + public static <T> T readDefaultResponse(TypeReference<T> responseType) { + Type _type = responseType.getType(); + if (_type instanceof ParameterizedType && ((ParameterizedType) _type).getRawType() == List.class) { + return (T) ImmutableList.of(); + } else if (_type instanceof ParameterizedType && ((ParameterizedType) _type).getRawType() == Map.class) { + return (T) ImmutableMap.of(); + } else { + // Not sure if this case will be reached, but if it is it'll be nice + // to know + throw new IllegalStateException("Cannot determine empty representation for " + _type); + } + } + + public static boolean isSuccessful(HttpResponse response, Integer... okCodes) { + return HttpStatus.isSuccess(response.getStatusLine().getStatusCode()) + || Sets.newHashSet(okCodes).contains(response.getStatusLine().getStatusCode()); + } + + public static String optionsFrom(CatalogOptions catalogOptions, QueryOptions queryOptions) { + String params = ""; + + if (catalogOptions != null) { + Map<String, Object> options = catalogOptions.toQuery(); + + if (options.containsKey("dc")) { + params += "dc=" + options.get("dc"); + } + if (options.containsKey("tag")) { + params += params.isEmpty() ? "" : "&"; + params += "tag=" + options.get("tag"); + } + } + + if (queryOptions != null) { + Map<String, Object> options = queryOptions.toQuery(); + + if (options.containsKey("consistent")) { + params += params.isEmpty() ? "" : "&"; + params += "consistent=" + options.get("consistent"); + } + if (options.containsKey("stale")) { + params += params.isEmpty() ? "" : "&"; + params += "stale=" + options.get("stale"); + } + if (options.containsKey("wait")) { + params += params.isEmpty() ? "" : "&"; + params += "wait=" + options.get("wait"); + } + + if (options.containsKey("index")) { + params += params.isEmpty() ? "" : "&"; + params += "index=" + options.get("index"); + } + if (options.containsKey("token")) { + params += params.isEmpty() ? "" : "&"; + params += "token=" + options.get("token"); + } + if (options.containsKey("near")) { + params += params.isEmpty() ? "" : "&"; + params += "near=" + options.get("near"); + } + if (options.containsKey("dc")) { + params += params.isEmpty() ? "" : "&"; + params += "dc=" + options.get("dc"); + } + } + return params; + } } |