diff options
author | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-07-25 15:18:33 +0800 |
---|---|---|
committer | HuabingZhao <zhao.huabing@zte.com.cn> | 2017-07-25 18:11:59 +0800 |
commit | 672f3d40be83d9e380fd7be4b674d5e8d5fa36de (patch) | |
tree | 43105e1d5e2ba8e8accea8648e57e1cf87db3f00 /apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java | |
parent | 41d3db15a8e1a0496f9c2a5e15db2998a32bb9bf (diff) |
Divide the MSB source codes into two repos
Change-Id: Ie76d545b214a8ce5191f215350a623e1529983d9
Issue-id: MSB-5
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
Diffstat (limited to 'apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java')
-rw-r--r-- | apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java new file mode 100644 index 0000000..9f0980c --- /dev/null +++ b/apiroute/apiroute-service/src/main/java/org/onap/msb/apiroute/wrapper/consulextend/util/Http.java @@ -0,0 +1,281 @@ +package org.onap.msb.apiroute.wrapper.consulextend.util; + +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.eclipse.jetty.http.HttpStatus; +import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback; +import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader; +import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.orbitz.consul.ConsulException; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.option.CatalogOptions; +import com.orbitz.consul.option.QueryOptions; +import com.orbitz.consul.util.Jackson; + +public class Http { + private static final Logger LOGGER = LoggerFactory.getLogger(Http.class); + + private final static CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients + .custom().setMaxConnTotal(Integer.MAX_VALUE) + .setMaxConnPerRoute(Integer.MAX_VALUE).build(); + + private static Http instance = null; + + private Http() { + } + + public static Http getInstance() { + if (instance == null) { + instance = new Http(); + httpAsyncClient.start(); + } + + return instance; + } + + // async get data from consul,and handle response immediately + public <T> void asyncGet(String requestURI, + final TypeReference<T> responseType, + final ConsulResponseCallback<T> callback, final Integer... okCodes) { + // LOGGER.info("Async request:"+requestURI); + + httpAsyncClient.execute(new HttpGet(requestURI), + new FutureCallback<HttpResponse>() { + + public void completed(final HttpResponse response) { + callback.onComplete(consulResponse(responseType, + response)); + } + + public void failed(final Exception ex) { + callback.onFailure(ex); + } + + public void cancelled() { + LOGGER.warn("cancelled async request"); + } + }); + } + + // async get data from consul,and handle response delay + public <T> void asyncGetDelayHandle(String requestURI, + final TypeReference<T> responseType, + final ConsulResponseCallback<T> callback, final Integer... okCodes) { + + httpAsyncClient.execute(new HttpGet(requestURI), + new FutureCallback<HttpResponse>() { + + public void completed(final HttpResponse response) { + OriginalConsulResponse<T> originalConsulResponse = new OriginalConsulResponse<T>( + response, responseType); + + //handle not 2xx code + if (!isSuccessful(response)) { + + LOGGER.warn("response statuscode:" + + response.getStatusLine().getStatusCode()); + + callback.onFailure(new ConsulException( + "response statuscode:" + + response.getStatusLine() + .getStatusCode())); + } else { + callback.onDelayComplete(originalConsulResponse); + } + + } + + public void failed(final Exception ex) { + callback.onFailure(ex); + } + + public void cancelled() { + LOGGER.warn("cancelled async request"); + } + }); + } + + public static ConsulResponseHeader consulResponseHeader( + HttpResponse response) { + String indexHeaderValue = response.getFirstHeader("X-Consul-Index") + .getValue(); + String lastContactHeaderValue = response.getFirstHeader( + "X-Consul-Lastcontact").getValue(); + String knownLeaderHeaderValue = response.getFirstHeader( + "X-Consul-Knownleader").getValue(); + + BigInteger index = indexHeaderValue == null ? new BigInteger("0") + : new BigInteger(indexHeaderValue); + long lastContact = lastContactHeaderValue == null ? 0 : Long + .parseLong(lastContactHeaderValue); + boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean + .parseBoolean(knownLeaderHeaderValue); + + return new ConsulResponseHeader(lastContact, knownLeader, index); + } + + public static <T> ConsulResponse<T> consulResponse( + TypeReference<T> responseType, HttpResponse response) { + + String indexHeaderValue = response.getFirstHeader("X-Consul-Index") + .getValue(); + String lastContactHeaderValue = response.getFirstHeader( + "X-Consul-Lastcontact").getValue(); + String knownLeaderHeaderValue = response.getFirstHeader( + "X-Consul-Knownleader").getValue(); + + BigInteger index = indexHeaderValue == null ? new BigInteger("0") + : new BigInteger(indexHeaderValue); + long lastContact = lastContactHeaderValue == null ? 0 : Long + .parseLong(lastContactHeaderValue); + boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean + .parseBoolean(knownLeaderHeaderValue); + + ConsulResponse<T> consulResponse = new ConsulResponse<T>(readResponse( + response, responseType), lastContact, knownLeader, index); + return consulResponse; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <T> T readResponse(HttpResponse response, + TypeReference<T> responseType) { + + // read streamed entity + T object; + + // HttpEntity,read original data. + Type _type = responseType.getType(); + if (_type instanceof Class + && (((Class) _type).isAssignableFrom(HttpEntity.class))) { + object = (T) response.getEntity(); + return object; + } + + // String,read original data. + if (_type instanceof Class + && (((Class) _type).isAssignableFrom(String.class))) { + + try { + + object = (T) IOUtils + .toString(response.getEntity().getContent()); + response.getEntity().getContent().close(); + + } catch (UnsupportedOperationException e) { + object = (T) ""; + LOGGER.warn("covert streamed entity to String exception:", e); + } catch (IOException e) { + object = (T) ""; + LOGGER.warn("covert streamed entity to String exception:", e); + } + + return object; + } + + // change data type + try { + object = Jackson.MAPPER.readValue( + response.getEntity().getContent(), responseType); + } catch (IOException e) { + LOGGER.warn("covert streamed entity to object exception:", e); + object = readDefaultResponse(responseType); + } + + return object; + } + + @SuppressWarnings("unchecked") + public static <T> T readDefaultResponse(TypeReference<T> responseType) { + Type _type = responseType.getType(); + if (_type instanceof ParameterizedType + && ((ParameterizedType) _type).getRawType() == List.class) { + return (T) ImmutableList.of(); + } else if (_type instanceof ParameterizedType + && ((ParameterizedType) _type).getRawType() == Map.class) { + return (T) ImmutableMap.of(); + } else { + // Not sure if this case will be reached, but if it is it'll be nice + // to know + throw new IllegalStateException( + "Cannot determine empty representation for " + _type); + } + } + + public static boolean isSuccessful(HttpResponse response, + Integer... okCodes) { + return HttpStatus.isSuccess(response.getStatusLine().getStatusCode()) + || Sets.newHashSet(okCodes).contains( + response.getStatusLine().getStatusCode()); + } + + public static String optionsFrom(CatalogOptions catalogOptions, + QueryOptions queryOptions) { + String params = ""; + + if (catalogOptions != null) { + Map<String, Object> options = catalogOptions.toQuery(); + + if (options.containsKey("dc")) { + params += "dc=" + options.get("dc"); + } + if (options.containsKey("tag")) { + params += params.isEmpty() ? "" : "&"; + params += "tag=" + options.get("tag"); + } + } + + if (queryOptions != null) { + Map<String, Object> options = queryOptions.toQuery(); + + if (options.containsKey("consistent")) { + params += params.isEmpty() ? "" : "&"; + params += "consistent=" + options.get("consistent"); + } + if (options.containsKey("stale")) { + params += params.isEmpty() ? "" : "&"; + params += "stale=" + options.get("stale"); + } + if (options.containsKey("wait")) { + params += params.isEmpty() ? "" : "&"; + params += "wait=" + options.get("wait"); + } + + if (options.containsKey("index")) { + params += params.isEmpty() ? "" : "&"; + params += "index=" + options.get("index"); + } + if (options.containsKey("token")) { + params += params.isEmpty() ? "" : "&"; + params += "token=" + options.get("token"); + } + if (options.containsKey("near")) { + params += params.isEmpty() ? "" : "&"; + params += "near=" + options.get("near"); + } + if (options.containsKey("dc")) { + params += params.isEmpty() ? "" : "&"; + params += "dc=" + options.get("dc"); + } + } + return params; + } +} |