diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl')
10 files changed, 2229 insertions, 2275 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java index 6670399..64a9f5e 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,47 +19,43 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; -public class Clock -{ - public synchronized static Clock getIt () - { - if ( sfClock == null ) - { - sfClock = new Clock (); - } - return sfClock; - } +public class Clock { + public static synchronized Clock getIt() { + if (sfClock == null) { + sfClock = new Clock(); + } + return sfClock; + } - /** - * Get the system's current time in milliseconds. - * @return the current time - */ - public static long now () - { - return getIt().nowImpl (); - } + /** + * Get the system's current time in milliseconds. + * + * @return the current time + */ + public static long now() { + return getIt().nowImpl(); + } - /** - * Get current time in milliseconds - * @return current time in ms - */ - protected long nowImpl () - { - return System.currentTimeMillis (); - } + /** + * Get current time in milliseconds. + * + * @return current time in ms + */ + protected long nowImpl() { + return System.currentTimeMillis(); + } - protected Clock () - { - } + protected Clock() { + } - private static Clock sfClock = null; + private static Clock sfClock = null; - protected synchronized static void register ( Clock testClock ) - { - sfClock = testClock; - } + protected static synchronized void register(Clock testClock) { + sfClock = testClock; + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java index 948ca31..5219286 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java @@ -4,6 +4,8 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,6 +21,7 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; import java.util.Properties; @@ -27,6 +30,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; + import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -34,92 +38,96 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; public class DmaapClientUtil { - private static final String MR_AUTH_CONSTANT = "X-CambriaAuth"; - private static final String MR_DATE_CONSTANT = "X-CambriaDate"; - private static final String[] httpClientProperties = { ClientProperties.CONNECT_TIMEOUT, - ClientProperties.READ_TIMEOUT, ClientProperties.PROXY_USERNAME, ClientProperties.PROXY_PASSWORD, - ClientProperties.PROXY_URI }; + private DmaapClientUtil() { + + } + + private static final String MR_AUTH_CONSTANT = "X-CambriaAuth"; + private static final String MR_DATE_CONSTANT = "X-CambriaDate"; + private static final String[] httpClientProperties = {ClientProperties.CONNECT_TIMEOUT, + ClientProperties.READ_TIMEOUT, ClientProperties.PROXY_USERNAME, ClientProperties.PROXY_PASSWORD, + ClientProperties.PROXY_URI}; - public static ClientConfig getClientConfig(Properties properties) { - ClientConfig config = new ClientConfig(); - if (properties != null && !properties.isEmpty()) { - setHttpClientProperties(config, properties); - } - return config; - } + public static ClientConfig getClientConfig(Properties properties) { + ClientConfig config = new ClientConfig(); + if (properties != null && !properties.isEmpty()) { + setHttpClientProperties(config, properties); + } + return config; + } - private static void setHttpClientProperties(ClientConfig config, Properties properties) { - for (int i = 0; i < httpClientProperties.length; i++) { - if ((properties.getProperty(httpClientProperties[i]) != null)) { - config.property(httpClientProperties[i], properties.getProperty(httpClientProperties[i])); - } - } - if ((properties.getProperty(ClientProperties.PROXY_URI) != null) && - !(properties.getProperty(ClientProperties.PROXY_URI).isEmpty())) { - config.connectorProvider(new ApacheConnectorProvider()); - } // else the default connectorProvider (HttpConnectorProvider) will be used + private static void setHttpClientProperties(ClientConfig config, Properties properties) { + for (String httpClientProperty : httpClientProperties) { + if ((properties.getProperty(httpClientProperty) != null)) { + config.property(httpClientProperty, properties.getProperty(httpClientProperty)); + } + } + if ((properties.getProperty(ClientProperties.PROXY_URI) != null) + && !(properties.getProperty(ClientProperties.PROXY_URI).isEmpty())) { + config.connectorProvider(new ApacheConnectorProvider()); + } // else the default connectorProvider (HttpConnectorProvider) will be used - } + } - public static WebTarget getTarget(ClientConfig config, final String path, final String username, - final String password) { - Client client = null; - if (config != null) { - client = ClientBuilder.newClient(config); - } else { - client = ClientBuilder.newClient(); - } - HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); - client.register(feature); + public static WebTarget getTarget(ClientConfig config, final String path, final String username, + final String password) { + Client client = null; + if (config != null) { + client = ClientBuilder.newClient(config); + } else { + client = ClientBuilder.newClient(); + } + HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password); + client.register(feature); - return client.target(path); - } + return client.target(path); + } - public static WebTarget getTarget(ClientConfig config, final String path) { + public static WebTarget getTarget(ClientConfig config, final String path) { - Client client = null; - if (config != null&&config.getProperties().size()>0) { - client = ClientBuilder.newClient(config); - } else { - client = ClientBuilder.newClient(); - } - return client.target(path); - } + Client client = null; + if (config != null && config.getProperties().size() > 0) { + client = ClientBuilder.newClient(config); + } else { + client = ClientBuilder.newClient(); + } + return client.target(path); + } - public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) { - return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get(); + public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) { + return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get(); - } + } - public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password, byte[] data, - String contentType) { - return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password) - .post(Entity.entity(data, contentType)); + public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password, byte[] data, + String contentType) { + return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password) + .post(Entity.entity(data, contentType)); - } + } - public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) { + public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) { - return target.request().header("Authorization", "Basic " + authHeader).get(); + return target.request().header("Authorization", "Basic " + authHeader).get(); - } + } - public static Response postResponsewtBasicAuth(WebTarget target, String authHeader, byte[] data, - String contentType) { + public static Response postResponsewtBasicAuth(WebTarget target, String authHeader, byte[] data, + String contentType) { - return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType)); + return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType)); - } + } - public static Response getResponsewtNoAuth(WebTarget target) { + public static Response getResponsewtNoAuth(WebTarget target) { - return target.request().get(); + return target.request().get(); - } + } - public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) { - return target.request().post(Entity.entity(data, contentType)); + public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) { + return target.request().post(Entity.entity(data, contentType)); - } + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java index 07cf6a7..9522c90 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,387 +19,387 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; +import com.att.nsa.apiClient.http.CacheUse; +import com.att.nsa.apiClient.http.HttpClient; import java.net.MalformedURLException; import java.util.Collection; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; - import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response; import org.apache.http.HttpException; +import org.apache.http.HttpStatus; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.internal.util.Base64; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.apiClient.http.CacheUse; -import com.att.nsa.apiClient.http.HttpClient; import org.onap.dmaap.mr.client.MRClient; import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import org.onap.dmaap.mr.client.ProtocolType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MRBaseClient extends HttpClient implements MRClient { - private final static String HEADER_TRANSACTION_ID = "transactionid"; - - private final static String JSON_RESULT = "result"; - private final static String JSON_STATUS = "status"; - - private final static String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."; - private final static String LOG_TRANSACTION_ID = "TransactionId : "; - - private ClientConfig clientConfig = null; - - protected MRBaseClient(Collection<String> hosts) throws MalformedURLException { - super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT); - - fLog = LoggerFactory.getLogger(this.getClass().getName()); - } - - protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException { - super(ConnectionType.HTTP, hosts, stdSvcPort); - - fLog = LoggerFactory.getLogger(this.getClass().getName()); - } - - protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException { - super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L, - TimeUnit.MILLISECONDS, 32, 32, 600000); - - fLog = LoggerFactory.getLogger(this.getClass().getName()); - } - - public ClientConfig getClientConfig1() { - return clientConfig; - } - - public void setClientConfig(ClientConfig config) { - this.clientConfig = config; - } - - @Override - public void close() { - // nothing to close - } - - protected Set<String> jsonArrayToSet(JSONArray a) { - if (a == null) - return null; - - final TreeSet<String> set = new TreeSet<>(); - for (int i = 0; i < a.length(); i++) { - set.add(a.getString(i)); - } - return set; - } - - public void logTo(Logger log) { - fLog = log; - replaceLogger(log); - } - - protected Logger getLog() { - return fLog; - } - - private Logger fLog; - - public JSONObject post(final String path, final byte[] data, final String contentType, final String username, - final String password, final String protocalFlag) throws HttpException, JSONException { - if ((null != username && null != password)) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - String encoding = Base64.encodeAsString(username + ":" + password); - - response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType); - - return getResponseDataInJson(response); - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public JSONObject postNoAuth(final String path, final byte[] data, String contentType) - throws HttpException, JSONException { - WebTarget target = null; - Response response = null; - if (contentType == null) { - contentType = "text/pain"; - } - target = DmaapClientUtil.getTarget(clientConfig,path); - - response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType); - - return getResponseDataInJson(response); - } - - public String postWithResponse(final String path, final byte[] data, final String contentType, - final String username, final String password, final String protocolFlag) - throws HttpException, JSONException { - String responseData = null; - if ((null != username && null != password)) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - String encoding = Base64.encodeAsString(username + ":" + password); - - response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType); - - responseData = (String) response.readEntity(String.class); - return responseData; - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public String postNoAuthWithResponse(final String path, final byte[] data, String contentType) - throws HttpException, JSONException { - - String responseData = null; - WebTarget target = null; - Response response = null; - if (contentType == null) { - contentType = "text/pain"; - } - target = DmaapClientUtil.getTarget(clientConfig,path); - - response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType); - responseData = (String) response.readEntity(String.class); - return responseData; - } - - public JSONObject postAuth(PostAuthDataObject postAuthDO) throws HttpException, JSONException { - if ((null != postAuthDO.getUsername() && null != postAuthDO.getPassword())) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,postAuthDO.getPath(), postAuthDO.getUsername(), - postAuthDO.getPassword()); - response = DmaapClientUtil.postResponsewtCambriaAuth(target, postAuthDO.getAuthKey(), - postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType()); - return getResponseDataInJson(response); - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public String postAuthwithResponse(final String path, final byte[] data, final String contentType, - final String authKey, final String authDate, final String username, final String password, - final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - if ((null != username && null != password)) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType); - responseData = (String) response.readEntity(String.class); - return responseData; - - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public JSONObject get(final String path, final String username, final String password, final String protocolFlag) - throws HttpException, JSONException { - if (null != username && null != password) { - - WebTarget target = null; - Response response = null; - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target = DmaapClientUtil.getTarget(clientConfig,path); - response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password); - } else { - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - String encoding = Base64.encodeAsString(username + ":" + password); - - response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding); - - } - return getResponseDataInJson(response); - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public String getResponse(final String path, final String username, final String password, - final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - if (null != username && null != password) { - WebTarget target = null; - Response response = null; - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - target = DmaapClientUtil.getTarget(clientConfig,path); - response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password); - } else { - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - String encoding = Base64.encodeAsString(username + ":" + password); - response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding); - } - MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - - String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); - if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info(LOG_TRANSACTION_ID + transactionid); - } - - responseData = (String) response.readEntity(String.class); - return responseData; - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username, - final String password, final String protocolFlag) throws HttpException, JSONException { - if (null != username && null != password) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate); - - return getResponseDataInJson(response); - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public JSONObject getNoAuth(final String path) throws HttpException, JSONException { - - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path); - response = DmaapClientUtil.getResponsewtNoAuth(target); - - return getResponseDataInJson(response); - } - - public String getAuthResponse(final String path, final String authKey, final String authDate, final String username, - final String password, final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - if (null != username && null != password) { - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate); - - MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - - String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); - if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info(LOG_TRANSACTION_ID + transactionid); - } - - responseData = (String) response.readEntity(String.class); - return responseData; - } else { - throw new HttpException(AUTH_FAILED); - } - } - - public String getNoAuthResponse(String path, final String username, final String password, - final String protocolFlag) throws HttpException, JSONException { - String responseData = null; - WebTarget target = null; - Response response = null; - target = DmaapClientUtil.getTarget(clientConfig,path, username, password); - response = DmaapClientUtil.getResponsewtNoAuth(target); - - MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - - String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); - if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info(LOG_TRANSACTION_ID + transactionid); - } - - responseData = (String) response.readEntity(String.class); - return responseData; - - } - - private JSONObject getResponseDataInJson(Response response) throws JSONException { - try { - MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - - // MultivaluedMap<String, Object> headersMap = - // for(String key : headersMap.keySet()) { - String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); - if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info(LOG_TRANSACTION_ID + transactionid); - } - - if (response.getStatus() == 403) { - JSONObject jsonObject = null; - jsonObject = new JSONObject(); - JSONArray jsonArray = new JSONArray(); - jsonArray.put(response.getEntity()); - jsonObject.put(JSON_RESULT, jsonArray); - jsonObject.put(JSON_STATUS, response.getStatus()); - return jsonObject; - } - String responseData = (String) response.readEntity(String.class); - - JSONTokener jsonTokener = new JSONTokener(responseData); - JSONObject jsonObject = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - JSONArray jsonArray = new JSONArray(jsonTokener); - jsonObject = new JSONObject(); - jsonObject.put(JSON_RESULT, jsonArray); - jsonObject.put(JSON_STATUS, response.getStatus()); - } else { - jsonObject = new JSONObject(jsonTokener); - jsonObject.put(JSON_STATUS, response.getStatus()); - } - - return jsonObject; - } catch (JSONException excp) { - fLog.error("DMAAP - Error reading response data.", excp); - return null; - } - - } - - public String getHTTPErrorResponseMessage(String responseString) { - - String response = null; - int beginIndex = 0; - int endIndex = 0; - if (responseString.contains("<body>")) { - - beginIndex = responseString.indexOf("body>") + 5; - endIndex = responseString.indexOf("</body"); - response = responseString.substring(beginIndex, endIndex); - } - - return response; - - } - - public String getHTTPErrorResponseCode(String responseString) { - - String response = null; - int beginIndex = 0; - int endIndex = 0; - if (responseString.contains("<title>")) { - beginIndex = responseString.indexOf("title>") + 6; - endIndex = responseString.indexOf("</title"); - response = responseString.substring(beginIndex, endIndex); - } - - return response; - } + private static final String HEADER_TRANSACTION_ID = "transactionid"; + + private static final String JSON_RESULT = "result"; + private static final String JSON_STATUS = "status"; + + private static final String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."; + private static final String LOG_TRANSACTION_ID = "TransactionId : {}"; + + private ClientConfig clientConfig = null; + + protected MRBaseClient(Collection<String> hosts) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT); + + logger = LoggerFactory.getLogger(this.getClass().getName()); + } + + protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, stdSvcPort); + + logger = LoggerFactory.getLogger(this.getClass().getName()); + } + + protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException { + super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L, + TimeUnit.MILLISECONDS, 32, 32, 600000); + + logger = LoggerFactory.getLogger(this.getClass().getName()); + } + + public ClientConfig getClientConfig1() { + return clientConfig; + } + + public void setClientConfig(ClientConfig config) { + this.clientConfig = config; + } + + @Override + public void close() { + // nothing to close + } + + protected Set<String> jsonArrayToSet(JSONArray array) { + if (array == null) { + return null; + } + final TreeSet<String> set = new TreeSet<>(); + for (int i = 0; i < array.length(); i++) { + set.add(array.getString(i)); + } + return set; + } + + public void logTo(Logger log) { + logger = log; + replaceLogger(log); + } + + protected Logger getLog() { + return logger; + } + + private Logger logger; + + public JSONObject post(final String path, final byte[] data, final String contentType, final String username, + final String password, final String protocalFlag) throws HttpException, JSONException { + if ((null != username && null != password)) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + String encoding = Base64.encodeAsString(username + ":" + password); + + response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType); + + return getResponseDataInJson(response); + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public JSONObject postNoAuth(final String path, final byte[] data, String contentType) + throws HttpException, JSONException { + WebTarget target = null; + Response response = null; + if (contentType == null) { + contentType = "text/pain"; + } + target = DmaapClientUtil.getTarget(clientConfig, path); + + response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType); + + return getResponseDataInJson(response); + } + + public String postWithResponse(final String path, final byte[] data, final String contentType, + final String username, final String password, final String protocolFlag) + throws HttpException, JSONException { + String responseData = null; + if ((null != username && null != password)) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + String encoding = Base64.encodeAsString(username + ":" + password); + + response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType); + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public String postNoAuthWithResponse(final String path, final byte[] data, String contentType) + throws HttpException, JSONException { + + String responseData = null; + WebTarget target = null; + Response response = null; + if (contentType == null) { + contentType = "text/pain"; + } + target = DmaapClientUtil.getTarget(clientConfig, path); + + response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType); + responseData = response.readEntity(String.class); + return responseData; + } + + public JSONObject postAuth(PostAuthDataObject postAuthDO) throws HttpException, JSONException { + if ((null != postAuthDO.getUsername() && null != postAuthDO.getPassword())) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, postAuthDO.getPath(), postAuthDO.getUsername(), + postAuthDO.getPassword()); + response = DmaapClientUtil.postResponsewtCambriaAuth(target, postAuthDO.getAuthKey(), + postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType()); + return getResponseDataInJson(response); + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public String postAuthwithResponse(final String path, final byte[] data, final String contentType, + final String authKey, final String authDate, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + if ((null != username && null != password)) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType); + responseData = response.readEntity(String.class); + return responseData; + + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public JSONObject get(final String path, final String username, final String password, final String protocolFlag) + throws HttpException, JSONException { + if (null != username && null != password) { + + WebTarget target = null; + Response response = null; + + if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + target = DmaapClientUtil.getTarget(clientConfig, path); + response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password); + } else { + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + String encoding = Base64.encodeAsString(username + ":" + password); + + response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding); + + } + return getResponseDataInJson(response); + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public String getResponse(final String path, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + if (null != username && null != password) { + WebTarget target = null; + Response response = null; + if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + target = DmaapClientUtil.getTarget(clientConfig, path); + response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password); + } else { + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + String encoding = Base64.encodeAsString(username + ":" + password); + response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding); + } + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); + + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + logger.info(LOG_TRANSACTION_ID, transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username, + final String password, final String protocolFlag) throws HttpException, JSONException { + if (null != username && null != password) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate); + + return getResponseDataInJson(response); + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public JSONObject getNoAuth(final String path) throws HttpException, JSONException { + + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path); + response = DmaapClientUtil.getResponsewtNoAuth(target); + + return getResponseDataInJson(response); + } + + public String getAuthResponse(final String path, final String authKey, final String authDate, final String username, + final String password, final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + if (null != username && null != password) { + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate); + + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); + + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + logger.info(LOG_TRANSACTION_ID, transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; + } else { + throw new HttpException(AUTH_FAILED); + } + } + + public String getNoAuthResponse(String path, final String username, final String password, + final String protocolFlag) throws HttpException, JSONException { + String responseData = null; + WebTarget target = null; + Response response = null; + target = DmaapClientUtil.getTarget(clientConfig, path, username, password); + response = DmaapClientUtil.getResponsewtNoAuth(target); + + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); + + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + logger.info(LOG_TRANSACTION_ID, transactionid); + } + + responseData = response.readEntity(String.class); + return responseData; + + } + + private JSONObject getResponseDataInJson(Response response) throws JSONException { + try { + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); + + // MultivaluedMap<String, Object> headersMap = + // for(String key : headersMap.keySet()) { + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); + if (transactionid != null && !transactionid.equalsIgnoreCase("")) { + logger.info(LOG_TRANSACTION_ID, transactionid); + } + + if (response.getStatus() == HttpStatus.SC_FORBIDDEN) { + JSONObject jsonObject = null; + jsonObject = new JSONObject(); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(response.getEntity()); + jsonObject.put(JSON_RESULT, jsonArray); + jsonObject.put(JSON_STATUS, response.getStatus()); + return jsonObject; + } + String responseData = response.readEntity(String.class); + + JSONTokener jsonTokener = new JSONTokener(responseData); + JSONObject jsonObject = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + JSONArray jsonArray = new JSONArray(jsonTokener); + jsonObject = new JSONObject(); + jsonObject.put(JSON_RESULT, jsonArray); + jsonObject.put(JSON_STATUS, response.getStatus()); + } else { + jsonObject = new JSONObject(jsonTokener); + jsonObject.put(JSON_STATUS, response.getStatus()); + } + + return jsonObject; + } catch (JSONException excp) { + logger.error("DMAAP - Error reading response data.", excp); + return null; + } + + } + + public String getHTTPErrorResponseMessage(String responseString) { + + String response = null; + int beginIndex = 0; + int endIndex = 0; + if (responseString.contains("<body>")) { + + beginIndex = responseString.indexOf("body>") + 5; + endIndex = responseString.indexOf("</body"); + response = responseString.substring(beginIndex, endIndex); + } + + return response; + + } + + public String getHTTPErrorResponseCode(String responseString) { + + String response = null; + int beginIndex = 0; + int endIndex = 0; + if (responseString.contains("<title>")) { + beginIndex = responseString.indexOf("title>") + 6; + endIndex = responseString.indexOf("</title"); + response = responseString.substring(beginIndex, endIndex); + } + + return response; + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java index 5c7259c..19f5b2c 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,10 +19,13 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; +import com.att.nsa.apiClient.http.HttpClient; +import com.att.nsa.apiClient.http.HttpException; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -34,454 +39,374 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.zip.GZIPOutputStream; - +import org.onap.dmaap.mr.client.MRBatchingPublisher; +import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.apiClient.http.HttpClient; -import com.att.nsa.apiClient.http.HttpException; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; /** * This is a batching publisher class that allows the client to publish messages * in batches that are limited in terms of size and/or hold time. - * + * * @author author * @deprecated This class's tricky locking doesn't quite work - * */ @Deprecated -public class MRBatchPublisher implements MRBatchingPublisher -{ - public static final long MIN_MAX_AGE_MS = 1; - - /** - * Create a batch publisher. - * - * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path. - * @param topic the topic to publish to - * @param maxBatchSize the maximum size of a batch - * @param maxAgeMs the maximum age of a batch - */ - public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) - { - if ( maxAgeMs < MIN_MAX_AGE_MS) - { - fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS); - maxAgeMs = MIN_MAX_AGE_MS; - } - - try { - fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress ); - } catch (MalformedURLException e) { - throw new IllegalArgumentException (e); - } - - // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for - // the oldest msg to hit max age? (locking is complicated, but should be do-able) - fExec = new ScheduledThreadPoolExecutor ( 1 ); - fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS ); - } - - @Override - public void setApiCredentials ( String apiKey, String apiSecret ) - { - fSender.setApiCredentials ( apiKey, apiSecret ); - } - - @Override - public void clearApiCredentials () - { - fSender.clearApiCredentials (); - } - - /** - * Send the given message with the given partition - * @param partition - * @param msg - * @throws IOException - */ - @Override - public int send ( String partition, String msg ) throws IOException - { - return send ( new message ( partition, msg ) ); - } - @Override - public int send ( String msg ) throws IOException - { - return send ( new message ( "",msg ) ); - } - /** - * Send the given message - * @param userMsg a message - * @throws IOException - */ - @Override - public int send ( message userMsg ) throws IOException - { - final LinkedList<message> list = new LinkedList<message> (); - list.add ( userMsg ); - return send ( list ); - } - - /** - * Send the given set of messages - * @param msgs the set of messages, sent in order of iteration - * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing) - * @throws IOException - */ - @Override - public int send ( Collection<message> msgs ) throws IOException - { - if ( msgs.isEmpty() ) - { - fSender.queue ( msgs ); - } - return fSender.size (); - } - - @Override - public int getPendingMessageCount () - { - return fSender.size (); - } - - /** - * Send any pending messages and close this publisher. - * @throws IOException - * @throws InterruptedException - */ - @Override - public void close () - { - try - { - final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS ); - if ( remains.isEmpty() ) - { - fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. " - + "(Consider using the alternate close method to capture unsent messages in this case.)" ); - } - } - catch ( InterruptedException e ) - { - fLog.warn ( "Possible message loss. " + e.getMessage(), e ); - Thread.currentThread().interrupt(); - } - catch ( IOException e ) - { - fLog.warn ( "Possible message loss. " + e.getMessage(), e ); - } - } - - public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException - { - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false ); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false ); - fExec.shutdown (); - - final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit ); - final long timeoutAtMs = System.currentTimeMillis () + waitInMs; - while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 ) - { - fSender.checkSend ( true ); - Thread.sleep ( 250 ); - } - - final LinkedList<message> result = new LinkedList<message> (); - fSender.drainTo ( result ); - return result; - } - - private final ScheduledThreadPoolExecutor fExec; - private final Sender fSender; - - private static class TimestampedMessage extends message - { - public TimestampedMessage ( message m ) - { - super ( m ); - timestamp = System.currentTimeMillis (); - } - public final long timestamp; - } - - private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class ); - - private class Sender extends MRBaseClient implements Runnable - { - public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException - { - super ( baseUrls ); - - fNextBatch = new LinkedList<TimestampedMessage> (); - fSendingBatch = null; - fTopic = topic; - fMaxBatchSize = maxBatch; - fMaxAgeMs = maxAgeMs; - fCompress = compress; - fLock = new ReentrantReadWriteLock (); - fWriteLock = fLock.writeLock (); - fReadLock = fLock.readLock (); - fDontSendUntilMs = 0; - } - - public void drainTo ( LinkedList<message> list ) - { - fWriteLock.lock (); - try - { - if ( fSendingBatch != null ) - { - list.addAll ( fSendingBatch ); - } - list.addAll ( fNextBatch ); - - fSendingBatch = null; - fNextBatch.clear (); - } - finally - { - fWriteLock.unlock (); - } - } - - /** - * Called periodically by the background executor. - */ - @Override - public void run () - { - try - { - checkSend ( false ); - } - catch ( IOException e ) - { - fLog.warn ( "MR background send: " + e.getMessage () ); - fLog.error( "IOException " + e ); - } - } - - public int size () - { - fReadLock.lock (); - try - { - return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () ); - } - finally - { - fReadLock.unlock (); - } - } - - /** - * Called to queue a message. - * @param msgs - * @throws IOException - */ - public void queue ( Collection<message> msgs ) throws IOException - { - fWriteLock.lock (); - try - { - for ( message userMsg : msgs ) - { - if ( userMsg != null ) - { - fNextBatch.add ( new TimestampedMessage ( userMsg ) ); - } - else - { - fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." ); - } - } - } - finally - { - fWriteLock.unlock(); - } - checkSend ( false ); - } - - /** - * Send a batch if the queue is long enough, or the first pending message is old enough. - * @param force - * @throws IOException - */ - public void checkSend ( boolean force ) throws IOException - { - // hold a read lock just long enough to evaluate whether a batch - // should be sent - boolean shouldSend = false; - fReadLock.lock (); - try - { - if ( fNextBatch.isEmpty() ) - { - final long nowMs = System.currentTimeMillis (); - shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize ); - if ( !shouldSend ) - { - final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs; - shouldSend = sendAtMs <= nowMs; - } - - // however, unless forced, wait after an error - shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs ); - } - // else: even in 'force', there's nothing to send, so shouldSend=false is fine - } - finally - { - fReadLock.unlock (); - } - - // if a send is required, acquire a write lock, swap out the next batch, - // swap in a fresh batch, and release the lock for the caller to start - // filling a batch again. After releasing the lock, send the current - // batch. (There could be more messages added between read unlock and - // write lock, but that's fine.) - if ( shouldSend ) - { - fSendingBatch = null; - - fWriteLock.lock (); - try - { - fSendingBatch = fNextBatch; - fNextBatch = new LinkedList<TimestampedMessage> (); - } - finally - { - fWriteLock.unlock (); - } - - if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) ) - { - fLog.warn ( "Send failed, rebuilding send queue." ); - - // note the time for back-off - fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis (); - - // the send failed. reconstruct the pending queue - fWriteLock.lock (); - try - { - final LinkedList<TimestampedMessage> nextGroup = fNextBatch; - fNextBatch = fSendingBatch; - fNextBatch.addAll ( nextGroup ); - fSendingBatch = null; - fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." ); - } - finally - { - fWriteLock.unlock (); - } - } - else - { - fWriteLock.lock (); - try - { - fSendingBatch = null; - } - finally - { - fWriteLock.unlock (); - } - } - } - } - - private LinkedList<TimestampedMessage> fNextBatch; - private LinkedList<TimestampedMessage> fSendingBatch; - private final String fTopic; - private final int fMaxBatchSize; - private final long fMaxAgeMs; - private final boolean fCompress; - private final ReentrantReadWriteLock fLock; - private final WriteLock fWriteLock; - private final ReadLock fReadLock; - private long fDontSendUntilMs; - private static final long SF_WAIT_AFTER_ERROR = 1000; - } - - // this is static so that it's clearly not using any mutable member data outside of a lock - private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log ) - { - // it's possible for this call to be made with an empty list. in this case, just return. - if ( toSend.isEmpty() ) - { - return true; - } - - final long nowMs = System.currentTimeMillis (); - final String url = MRConstants.makeUrl ( topic ); - - log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" ); - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream (); - try - { - OutputStream os = baseStream; - if ( compress ) - { - os = new GZIPOutputStream ( baseStream ); - } - for ( TimestampedMessage m : toSend ) - { - os.write ( ( "" + m.fPartition.length () ).getBytes() ); - os.write ( '.' ); - os.write ( ( "" + m.fMsg.length () ).getBytes() ); - os.write ( '.' ); - os.write ( m.fPartition.getBytes() ); - os.write ( m.fMsg.getBytes() ); - os.write ( '\n' ); - } - os.close (); - } - catch ( IOException e ) - { - log.warn ( "Problem writing stream to post: " + e.getMessage (),e ); - return false; - } - - boolean result = false; - final long startMs = System.currentTimeMillis (); - try - { - client.post ( url, compress ? - MRFormat.CAMBRIA_ZIP.toString () : - MRFormat.CAMBRIA.toString (), - baseStream.toByteArray(), false ); - result = true; - } - catch ( HttpException e ) - { - log.warn ( "Problem posting to MR: " + e.getMessage(),e ); - } - catch ( IOException e ) - { - log.warn ( "Problem posting to MR: " + e.getMessage(),e ); - } - - log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" ); - return result; - } - - @Override - public void logTo ( Logger log ) - { - fLog = log; - } - - @Override - public MRPublisherResponse sendBatchWithResponse() { - // TODO Auto-generated method stub - return null; - } - +public class MRBatchPublisher implements MRBatchingPublisher { + public static final long MIN_MAX_AGE_MS = 1; + + /** + * Create a batch publisher. + * + * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path. + * @param topic the topic to publish to + * @param maxBatchSize the maximum size of a batch + * @param maxAgeMs the maximum age of a batch + */ + public MRBatchPublisher(Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress) { + if (maxAgeMs < MIN_MAX_AGE_MS) { + logger.warn("Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS); + maxAgeMs = MIN_MAX_AGE_MS; + } + + try { + fSender = new Sender(baseUrls, topic, maxBatchSize, maxAgeMs, compress); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + + // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for + // the oldest msg to hit max age? (locking is complicated, but should be do-able) + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(fSender, 100, 50, TimeUnit.MILLISECONDS); + } + + @Override + public void setApiCredentials(String apiKey, String apiSecret) { + fSender.setApiCredentials(apiKey, apiSecret); + } + + @Override + public void clearApiCredentials() { + fSender.clearApiCredentials(); + } + + /** + * Send the given message with the given partition. + * + * @param partition + * @param msg + * @throws IOException + */ + @Override + public int send(String partition, String msg) throws IOException { + return send(new Message(partition, msg)); + } + + @Override + public int send(String msg) throws IOException { + return send(new Message("", msg)); + } + + /** + * Send the given message. + * + * @param userMsg a message + * @throws IOException + */ + @Override + public int send(Message userMsg) throws IOException { + final LinkedList<Message> list = new LinkedList<>(); + list.add(userMsg); + return send(list); + } + + /** + * Send the given set of messages. + * + * @param msgs the set of messages, sent in order of iteration + * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing) + * @throws IOException + */ + @Override + public int send(Collection<Message> msgs) throws IOException { + if (msgs.isEmpty()) { + fSender.queue(msgs); + } + return fSender.size(); + } + + @Override + public int getPendingMessageCount() { + return fSender.size(); + } + + /** + * Send any pending messages and close this publisher. + */ + @Override + public void close() { + try { + final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.isEmpty()) { + logger.warn("Closing publisher with {} messages unsent. (Consider using the alternate close method to capture unsent messages in this case.)", remains.size()); + } + } catch (InterruptedException e) { + logger.warn("Possible message loss. " + e.getMessage(), e); + Thread.currentThread().interrupt(); + } catch (IOException e) { + logger.warn("Possible message loss. " + e.getMessage(), e); + } + } + + public List<Message> close(long time, TimeUnit unit) throws InterruptedException, IOException { + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); + + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); + final long timeoutAtMs = System.currentTimeMillis() + waitInMs; + while (System.currentTimeMillis() < timeoutAtMs && getPendingMessageCount() > 0) { + fSender.checkSend(true); + Thread.sleep(250); + } + + final LinkedList<Message> result = new LinkedList<>(); + fSender.drainTo(result); + return result; + } + + private final ScheduledThreadPoolExecutor fExec; + private final Sender fSender; + + private static class TimestampedMessage extends Message { + public TimestampedMessage(Message m) { + super(m); + timestamp = System.currentTimeMillis(); + } + + public final long timestamp; + } + + private Logger logger = LoggerFactory.getLogger(MRBatchPublisher.class); + + private class Sender extends MRBaseClient implements Runnable { + public Sender(Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress) throws MalformedURLException { + super(baseUrls); + + fNextBatch = new LinkedList<>(); + fSendingBatch = null; + fTopic = topic; + fMaxBatchSize = maxBatch; + fMaxAgeMs = maxAgeMs; + fCompress = compress; + fLock = new ReentrantReadWriteLock(); + fWriteLock = fLock.writeLock(); + fReadLock = fLock.readLock(); + fDontSendUntilMs = 0; + } + + public void drainTo(List<Message> list) { + fWriteLock.lock(); + try { + if (fSendingBatch != null) { + list.addAll(fSendingBatch); + } + list.addAll(fNextBatch); + + fSendingBatch = null; + fNextBatch.clear(); + } finally { + fWriteLock.unlock(); + } + } + + /** + * Called periodically by the background executor. + */ + @Override + public void run() { + try { + checkSend(false); + } catch (Exception e) { + logger.warn("MR background send: {}", e.getMessage()); + logger.error("IOException {}", e.getMessage()); + } + } + + public int size() { + fReadLock.lock(); + try { + return fNextBatch.size() + (fSendingBatch == null ? 0 : fSendingBatch.size()); + } finally { + fReadLock.unlock(); + } + } + + /** + * Called to queue a message. + * + * @param msgs + * @throws IOException + */ + public void queue(Collection<Message> msgs) { + fWriteLock.lock(); + try { + for (Message userMsg : msgs) { + if (userMsg != null) { + fNextBatch.add(new TimestampedMessage(userMsg)); + } else { + logger.warn("MRBatchPublisher::Sender::queue received a null message."); + } + } + } finally { + fWriteLock.unlock(); + } + checkSend(false); + } + + /** + * Send a batch if the queue is long enough, or the first pending message is old enough. + * + * @param force + */ + public void checkSend(boolean force) { + // hold a read lock just long enough to evaluate whether a batch + // should be sent + boolean shouldSend = false; + fReadLock.lock(); + try { + if (fNextBatch.isEmpty()) { + final long nowMs = System.currentTimeMillis(); + shouldSend = (force || fNextBatch.size() >= fMaxBatchSize); + if (!shouldSend) { + final long sendAtMs = fNextBatch.getFirst().timestamp + fMaxAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, unless forced, wait after an error + shouldSend = force || (shouldSend && nowMs >= fDontSendUntilMs); + } + // else: even in 'force', there's nothing to send, so shouldSend=false is fine + } finally { + fReadLock.unlock(); + } + + // if a send is required, acquire a write lock, swap out the next batch, + // swap in a fresh batch, and release the lock for the caller to start + // filling a batch again. After releasing the lock, send the current + // batch. (There could be more messages added between read unlock and + // write lock, but that's fine.) + if (shouldSend) { + fSendingBatch = null; + + fWriteLock.lock(); + try { + fSendingBatch = fNextBatch; + fNextBatch = new LinkedList<>(); + } finally { + fWriteLock.unlock(); + } + + if (!doSend(fSendingBatch, this, fTopic, fCompress, logger)) { + logger.warn("Send failed, rebuilding send queue."); + + // note the time for back-off + fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis(); + + // the send failed. reconstruct the pending queue + fWriteLock.lock(); + try { + final LinkedList<TimestampedMessage> nextGroup = fNextBatch; + fNextBatch = fSendingBatch; + fNextBatch.addAll(nextGroup); + fSendingBatch = null; + logger.info("Send queue rebuilt; {} messages to send.", fNextBatch.size()); + } finally { + fWriteLock.unlock(); + } + } else { + fWriteLock.lock(); + try { + fSendingBatch = null; + } finally { + fWriteLock.unlock(); + } + } + } + } + + private LinkedList<TimestampedMessage> fNextBatch; + private LinkedList<TimestampedMessage> fSendingBatch; + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxAgeMs; + private final boolean fCompress; + private final ReentrantReadWriteLock fLock; + private final WriteLock fWriteLock; + private final ReadLock fReadLock; + private long fDontSendUntilMs; + private static final long SF_WAIT_AFTER_ERROR = 1000; + } + + // this is static so that it's clearly not using any mutable member data outside of a lock + private static boolean doSend(LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log) { + // it's possible for this call to be made with an empty list. in this case, just return. + if (toSend.isEmpty()) { + return true; + } + + final long nowMs = System.currentTimeMillis(); + final String url = MRConstants.makeUrl(topic); + + log.info("sending {} msgs to {}. Oldest: {} ms", toSend.size(), url, (nowMs - toSend.getFirst().timestamp)); + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + try { + OutputStream os = baseStream; + if (compress) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : toSend) { + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } catch (IOException e) { + log.warn("Problem writing stream to post: " + e.getMessage(), e); + return false; + } + + boolean result = false; + final long startMs = System.currentTimeMillis(); + try { + client.post(url, + compress ? MRFormat.CAMBRIA_ZIP.toString() : MRFormat.CAMBRIA.toString(), + baseStream.toByteArray(), false); + result = true; + } catch (HttpException | IOException e) { + log.warn("Problem posting to MR: " + e.getMessage(), e); + } + + log.info("MR response ({} ms): OK", (System.currentTimeMillis() - startMs)); + return result; + } + + @Override + public void logTo(Logger log) { + logger = log; + } + + @Override + public MRPublisherResponse sendBatchWithResponse() { + // Auto-generated method stub + return null; + } + } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java index 6a13910..0507eee 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,37 +19,37 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; import java.io.IOException; import java.io.InputStream; import java.util.Properties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MRClientVersionInfo -{ - private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class); - public static String getVersion () - { - return version; - } +public class MRClientVersionInfo { + private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class); + + public static String getVersion() { + return VERSION; + } + + private static final Properties PROPS = new Properties(); + private static final String VERSION; - private static final Properties props = new Properties(); - private static final String version; - static { - String use = null; - try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties" )) { - if (is != null) { - props.load(is); - use = props.getProperty ( "MRClientVersion", null ); - } - } catch ( IOException e ) { - logger.error("exception: ", e); - } - version = use; - } + static { + String use = null; + try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties")) { + if (is != null) { + PROPS.load(is); + use = PROPS.getProperty("MRClientVersion", null); + } + } catch (IOException e) { + logger.error("exception: ", e); + } + VERSION = use; + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java index dbf6b4d..b05d839 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,162 +19,155 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; -import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import org.apache.http.HttpHost; -class MRConstants -{ - private static final String PROTOCOL = "http"; - public static final String CONTEXT = "/"; - public static final String BASE_PATH = "events/"; - public static final int STD_MR_SERVICE_PORT = 8080; - - public static String escape ( String s ) - { - try - { - return URLEncoder.encode ( s, "UTF-8"); - } - catch ( UnsupportedEncodingException e ) - { - throw new IllegalArgumentException(e); - } - } - - public static String makeUrl ( String rawTopic ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(). - append ( MRConstants.CONTEXT). - append ( MRConstants.BASE_PATH). - append ( cleanTopic ); - return url.toString (); - } - - public static String makeUrl ( final String host, final String rawTopic ) - { - final String cleanTopic = escape ( rawTopic ); - final StringBuffer url = new StringBuffer(); - - if (!host.startsWith("http") && !host.startsWith("https") ) { - url.append( PROTOCOL + "://" ); - } - url.append(host); - url.append ( MRConstants.CONTEXT); - url.append ( MRConstants.BASE_PATH); - url.append ( cleanTopic ); - return url.toString (); - } - - public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(); - - if (transferprotocol !=null && !transferprotocol.equals("")) { - url.append( transferprotocol + "://" ); - }else{ - url.append( PROTOCOL + "://" ); - } - url.append(host); - url.append ( MRConstants.CONTEXT); - url.append ( MRConstants.BASE_PATH); - url.append ( cleanTopic ); - if(parttion!=null && !parttion.equalsIgnoreCase("")) - url.append("?partitionKey=").append(parttion); - return url.toString (); - } - public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId ) - { - final String cleanConsumerGroup = escape ( rawConsumerGroup ); - final String cleanConsumerId = escape ( rawConsumerId ); - return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; - } - - /** - * Create a list of HttpHosts from an input list of strings. Input strings have - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param hosts - * @return a list of hosts - */ - public static List<HttpHost> createHostsList(Collection<String> hosts) - { - final ArrayList<HttpHost> convertedHosts = new ArrayList<> (); - for ( String host : hosts ) - { - if ( host.length () == 0 ) continue; - convertedHosts.add ( hostForString ( host ) ); - } - return convertedHosts; - } - - /** - * Return an HttpHost from an input string. Input string has - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param host - * @return a list of hosts - */ - public static HttpHost hostForString ( String host ) - { - if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); - - String hostPart = host; - int port = STD_MR_SERVICE_PORT; - - final int colon = host.indexOf ( ':' ); - if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); - if ( colon > 0 ) - { - hostPart = host.substring ( 0, colon ).trim(); - - final String portPart = host.substring ( colon + 1 ).trim(); - if ( portPart.length () > 0 ) - { - try - { - port = Integer.parseInt ( portPart ); - } - catch ( NumberFormatException x ) - { - throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x ); - } - } - // else: use default port on "foo:" - } - - return new HttpHost ( hostPart, port ); - } - - public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) { - final String cleanConsumerGroup = escape ( fGroup ); - final String cleanConsumerId = escape ( fId ); - - StringBuffer url = new StringBuffer(); - - if (transferprotocol !=null && !transferprotocol.equals("")) { - url.append( transferprotocol + "://" ); - }else{ - url.append( PROTOCOL + "://" ); - } - - url.append(host); - url.append(CONTEXT); - url.append(BASE_PATH); - url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId); - - return url.toString(); - } +class MRConstants { + + private MRConstants() { + + } + + private static final String PROTOCOL = "http"; + public static final String CONTEXT = "/"; + public static final String BASE_PATH = "events/"; + public static final int STD_MR_SERVICE_PORT = 8080; + + public static String escape(String url) { + return URLEncoder.encode(url, StandardCharsets.UTF_8); + } + + public static String makeUrl(String rawTopic) { + final String cleanTopic = escape(rawTopic); + + return new StringBuilder() + .append(MRConstants.CONTEXT) + .append(MRConstants.BASE_PATH) + .append(cleanTopic).toString(); + } + + public static String makeUrl(final String host, final String rawTopic) { + final String cleanTopic = escape(rawTopic); + final StringBuilder url = new StringBuilder(); + + if (!host.startsWith("http") && !host.startsWith("https")) { + url.append(PROTOCOL).append("://"); + } + url.append(host); + url.append(MRConstants.CONTEXT); + url.append(MRConstants.BASE_PATH); + url.append(cleanTopic); + return url.toString(); + } + + public static String makeUrl(final String host, final String rawTopic, final String transferProtocol, final String partition) { + final String cleanTopic = escape(rawTopic); + + final StringBuilder url = new StringBuilder(); + + if (transferProtocol != null && !transferProtocol.isEmpty()) { + url.append(transferProtocol).append("://"); + } else { + url.append(PROTOCOL).append("://"); + } + url.append(host); + url.append(MRConstants.CONTEXT); + url.append(MRConstants.BASE_PATH); + url.append(cleanTopic); + if (partition != null && !partition.isEmpty()) { + url.append("?partitionKey=").append(partition); + } + return url.toString(); + } + + public static String makeConsumerUrl(String topic, String rawConsumerGroup, String rawConsumerId) { + final String cleanConsumerGroup = escape(rawConsumerGroup); + final String cleanConsumerId = escape(rawConsumerId); + return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; + } + + /** + * Create a list of HttpHosts from an input list of strings. Input strings have + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param hosts + * @return a list of hosts + */ + public static List<HttpHost> createHostsList(Collection<String> hosts) { + final ArrayList<HttpHost> convertedHosts = new ArrayList<>(); + for (String host : hosts) { + if (host.length() == 0) { + continue; + } + convertedHosts.add(hostForString(host)); + } + return convertedHosts; + } + + /** + * Return an HttpHost from an input string. Input string has + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param host + * @return a list of hosts + */ + public static HttpHost hostForString(String host) { + if (host.length() < 1) { + throw new IllegalArgumentException("An empty host entry is invalid."); + } + + String hostPart = host; + int port = STD_MR_SERVICE_PORT; + + final int colon = host.indexOf(':'); + if (colon == 0) { + throw new IllegalArgumentException("Host entry '" + host + "' is invalid."); + } + if (colon > 0) { + hostPart = host.substring(0, colon).trim(); + + final String portPart = host.substring(colon + 1).trim(); + if (portPart.length() > 0) { + try { + port = Integer.parseInt(portPart); + } catch (NumberFormatException x) { + throw new IllegalArgumentException("Host entry '" + host + "' is invalid.", x); + } + } + // else: use default port on "foo:" + } + + return new HttpHost(hostPart, port); + } + + public static String makeConsumerUrl(String host, String topic, String group, String id, final String transferprotocol) { + final String cleanConsumerGroup = escape(group); + final String cleanConsumerId = escape(id); + + StringBuilder url = new StringBuilder(); + + if (transferprotocol != null && !transferprotocol.equals("")) { + url.append(transferprotocol).append("://"); + } else { + url.append(PROTOCOL).append("://"); + } + + url.append(host) + .append(CONTEXT) + .append(BASE_PATH) + .append(topic) + .append("/").append(cleanConsumerGroup) + .append("/").append(cleanConsumerId); + + return url.toString(); + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java index 6c67313..57ae3ee 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,24 +19,26 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; import com.att.aft.dme2.api.DME2Client; import com.att.aft.dme2.api.DME2Exception; -import org.onap.dmaap.mr.client.HostSelector; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; - -import java.io.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.http.HttpException; import org.apache.http.HttpStatus; @@ -42,20 +46,24 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; +import org.onap.dmaap.mr.client.HostSelector; +import org.onap.dmaap.mr.client.MRClientFactory; +import org.onap.dmaap.mr.client.MRConsumer; +import org.onap.dmaap.mr.client.ProtocolType; +import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MRConsumerImpl extends MRBaseClient implements MRConsumer { - private Logger log = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger logger = LoggerFactory.getLogger(MRConsumerImpl.class); public static final String ROUTER_FILE_PATH = null; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); + public String protocolFlag = ProtocolType.DME2.getValue(); public String consumerFilePath; private static final String JSON_RESULT = "result"; - private static final String PROPS_PROTOCOL = "Protocol"; private static final String EXECPTION_MESSAGE = "exception: "; private static final String SUCCESS_MESSAGE = "Success"; @@ -70,6 +78,29 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private static final String URL_PARAM_LIMIT = "limit"; private static final String URL_PARAM_TIMEOUT = "timeout"; + private static final String USERNAME = "username"; + private static final String SERVICE_NAME = "ServiceName"; + private static final String PARTNER = "Partner"; + private static final String ROUTE_OFFER = "routeOffer"; + private static final String PROTOCOL = "Protocol"; + private static final String METHOD_TYPE = "MethodType"; + private static final String CONTENT_TYPE = "contenttype"; + private static final String LATITUDE = "Latitude"; + private static final String LONGITUDE = "Longitude"; + private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT"; + private static final String VERSION = "Version"; + private static final String ENVIRONMENT = "Environment"; + private static final String SUB_CONTEXT_PATH = "SubContextPath"; + private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired"; + private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS"; + private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT"; + private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS"; + private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS"; + private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON"; + private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS"; + private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS"; + private final String fTopic; private final String fGroup; private final String fId; @@ -192,8 +223,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { public Iterable<String> fetch(int timeoutMs, int limit) throws Exception { final LinkedList<String> msgs = new LinkedList<>(); - ProtocolTypeConstants protocolFlagEnum = null; - for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) { + ProtocolType protocolFlagEnum = null; + for (ProtocolType type : ProtocolType.values()) { if (type.getValue().equalsIgnoreCase(protocolFlag)) { protocolFlagEnum = type; } @@ -211,27 +242,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { break; case AAF_AUTH: String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); final JSONObject o = get(urlAuthPath, username, password, protocolFlag); readJsonData(msgs, o); break; case AUTH_KEY: final String urlKeyPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag); readJsonData(msgs, authObject); break; case HTTPNOAUTH: final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); readJsonData(msgs, getNoAuth(urlNoAuthPath)); break; } } catch (JSONException e) { // unexpected response reportProblemWithResponse(); - log.error(EXECPTION_MESSAGE, e); + logger.error(EXECPTION_MESSAGE, e); } catch (HttpException e) { throw new IOException(e); } @@ -244,10 +275,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { final JSONArray a = o.getJSONArray(JSON_RESULT); if (a != null) { for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) + if (a.get(i) instanceof String) { msgs.add(a.getString(i)); - else + } else { msgs.add(a.getJSONObject(i).toString()); + } } } } @@ -264,7 +296,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { final LinkedList<String> msgs = new LinkedList<>(); MRConsumerResponse mrConsumerResponse = new MRConsumerResponse(); try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) { dmeConfigure(timeoutMs, limit); long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs @@ -277,9 +309,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { createMRConsumerResponse(reply, mrConsumerResponse); } - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); String response = getResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); @@ -287,9 +319,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { createMRConsumerResponse(response, mrConsumerResponse); } - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); @@ -298,9 +330,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { createMRConsumerResponse(response, mrConsumerResponse); } - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit); String response = getNoAuthResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); @@ -311,19 +343,19 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } catch (JSONException e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("json exception: ", e); + logger.error("json exception: ", e); } catch (HttpException e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("http exception: ", e); + logger.error("http exception: ", e); } catch (DME2Exception e) { mrConsumerResponse.setResponseCode(e.getErrorCode()); mrConsumerResponse.setResponseMessage(e.getErrorMessage()); - log.error("DME2 exception: ", e); + logger.error("DME2 exception: ", e); } catch (Exception e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error(EXECPTION_MESSAGE, e); + logger.error(EXECPTION_MESSAGE, e); } mrConsumerResponse.setActualMessages(msgs); return mrConsumerResponse; @@ -331,16 +363,16 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { @Override protected void reportProblemWithResponse() { - log.warn("There was a problem with the server response. Blacklisting for 3 minutes."); + logger.warn("There was a problem with the server response. Blacklisting for 3 minutes."); super.reportProblemWithResponse(); fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES); } private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) { if (reply.startsWith("{")) { - JSONObject jObject = new JSONObject(reply); - String message = jObject.getString("message"); - int status = jObject.getInt("status"); + JSONObject jsonObject = new JSONObject(reply); + String message = jsonObject.getString("message"); + int status = jsonObject.getInt("status"); mrConsumerResponse.setResponseCode(Integer.toString(status)); @@ -372,7 +404,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { return jsonObject; } catch (JSONException excp) { - log.error("DMAAP - Error reading response data.", excp); + logger.error("DMAAP - Error reading response data.", excp); return null; } } @@ -403,22 +435,22 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { this.longPollingMs = timeoutMs; - String latitude = props.getProperty("Latitude"); - String longitude = props.getProperty("Longitude"); - String version = props.getProperty("Version"); - String serviceName = props.getProperty("ServiceName"); - String env = props.getProperty("Environment"); - String partner = props.getProperty("Partner"); - String routeOffer = props.getProperty("routeOffer"); - String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; - String protocol = props.getProperty(PROPS_PROTOCOL); - String methodType = props.getProperty("MethodType"); - String dmeuser = props.getProperty("username"); - String dmepassword = props.getProperty("password"); - String contenttype = props.getProperty("contenttype"); - String handlers = props.getProperty("sessionstickinessrequired"); - - /** + String latitude = props.getProperty(LATITUDE); + String longitude = props.getProperty(LONGITUDE); + String version = props.getProperty(VERSION); + String serviceName = props.getProperty(SERVICE_NAME); + String env = props.getProperty(ENVIRONMENT); + String partner = props.getProperty(PARTNER); + String routeOffer = props.getProperty(ROUTE_OFFER); + String subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic + "/" + fGroup + "/" + fId; + String protocol = props.getProperty(PROTOCOL); + String methodType = props.getProperty(METHOD_TYPE); + String dmeuser = props.getProperty(USERNAME); + String dmepassword = props.getProperty(USERNAME); + String contenttype = props.getProperty(CONTENT_TYPE); + String handlers = props.getProperty(SESSION_STICKINESS_REQUIRED); + + /* * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not * provided use the routeOffer value for auto failover within a cluster */ @@ -458,13 +490,13 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { url = contextUrl.toString(); DMETimeOuts = new HashMap<>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS)); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS)); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT)); DMETimeOuts.put("Content-Type", contenttype); System.setProperty("AFT_LATITUDE", latitude); System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); + System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT)); // SSL changes System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); @@ -474,7 +506,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { long dme2PerEndPointTimeoutMs; try { - dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS")); + dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty(DME2_PER_HANDLER_TIMEOUT_MS)); // backward compatibility if (dme2PerEndPointTimeoutMs <= 0) { dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS; @@ -483,15 +515,15 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { // backward compatibility dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS; getLog().debug( - "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS); + DME2_PER_HANDLER_TIMEOUT_MS + " not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS); } try { - dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS")); + dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty(DME2_REPLY_HANDLER_TIMEOUT_MS)); } catch (NumberFormatException nfe) { try { - long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS)); + long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_CONN_TIMEOUT)); dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs; getLog().debug( "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " @@ -518,9 +550,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { sender.setPayload(""); if (handlers != null && handlers.equalsIgnoreCase("yes")) { sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", - props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); + props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS)); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS)); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON)); } else { sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); } @@ -548,7 +580,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); } catch (UnsupportedEncodingException e) { - log.error("exception at createUrlPath () : ", e); + logger.error("exception at createUrlPath () : ", e); } } @@ -560,10 +592,10 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } private String readRoute(String routeKey) { - try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) { + try (InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) { MRClientFactory.prop.load(input); } catch (Exception ex) { - log.error("Reply Router Error " + ex); + logger.error("Reply Router Error " + ex); } return MRClientFactory.prop.getProperty(routeKey); } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java index 538f1e3..0ebebe5 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,36 +19,39 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; -enum MRFormat -{ - /** - * Messages are sent using MR's message format. - */ - CAMBRIA - { - @Override - public String toString() { return "application/cambria"; } - }, +enum MRFormat { + /** + * Messages are sent using MR's message format. + */ + CAMBRIA { + @Override + public String toString() { + return "application/cambria"; + } + }, - /** - * Messages are sent using MR's message format with compression. - */ - CAMBRIA_ZIP - { - @Override - public String toString() { return "application/cambria-zip"; } - }, + /** + * Messages are sent using MR's message format with compression. + */ + CAMBRIA_ZIP { + @Override + public String toString() { + return "application/cambria-zip"; + } + }, - /** - * messages are sent as simple JSON objects. - */ - JSON - { - @Override - public String toString() { return "application/json"; } - } + /** + * messages are sent as simple JSON objects. + */ + JSON { + @Override + public String toString() { + return "application/json"; + } + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java index c1e2d12..414178d 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,256 +19,212 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; +import com.att.nsa.apiClient.credentials.ApiCredential; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; import java.io.IOException; import java.net.MalformedURLException; import java.util.Collection; import java.util.Set; import java.util.TreeSet; - import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.att.nsa.apiClient.credentials.ApiCredential; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.apiClient.http.HttpObjectNotFoundException; import org.onap.dmaap.mr.client.MRIdentityManager; import org.onap.dmaap.mr.client.MRTopicManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager -{ - private static final String BASE_URI_TOPIC = "/topics"; - private static final String BASE_URI_APIKEY = "/apiKeys"; - - private static final String PARAM_DESCRIPTION = "description"; - private static final String PARAM_EMAIL = "email"; - - private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class); - public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException - { - super ( baseUrls ); - } - - @Override - public Set<String> getTopics () throws IOException - { - final TreeSet<String> set = new TreeSet<> (); - try - { - final JSONObject topicSet = get ( BASE_URI_TOPIC ); - final JSONArray a = topicSet.getJSONArray ( "topics" ); - for ( int i=0; i<a.length (); i++ ) - { - set.add ( a.getString ( i ) ); - } - } - catch ( HttpObjectNotFoundException e ) - { - getLog().warn ( "No /topics endpoint on service." ); - logger.error("HttpObjectNotFoundException: ", e); - } - catch ( JSONException e ) - { - getLog().warn ( "Bad /topics result from service." ); - logger.error("JSONException: ", e); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - return set; - } - - @Override - public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException - { - try - { - final JSONObject topicData = get ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) ); - return new TopicInfo () - { - @Override - public String getOwner () - { - return topicData.optString ( "owner", null ); - } - - @Override - public String getDescription () - { - return topicData.optString ( PARAM_DESCRIPTION, null ); - } - - @Override - public Set<String> getAllowedProducers () - { - final JSONObject acl = topicData.optJSONObject ( "writerAcl" ); - if ( acl != null && acl.optBoolean ( "enabled", true ) ) - { - return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); - } - return null; - } - - @Override - public Set<String> getAllowedConsumers () - { - final JSONObject acl = topicData.optJSONObject ( "readerAcl" ); - if ( acl != null && acl.optBoolean ( "enabled", true ) ) - { - return jsonArrayToSet ( acl.optJSONArray ( "users" ) ); - } - return null; - } - }; - } - catch ( JSONException e ) - { - throw new IOException ( e ); - } - catch ( HttpException e ) - { - throw new IOException ( e ); - } - } - - @Override - public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException - { - final JSONObject o = new JSONObject (); - o.put ( "topicName", topicName ); - o.put ( "topicDescription", topicDescription ); - o.put ( "partitionCount", partitionCount ); - o.put ( "replicationCount", replicationCount ); - post ( BASE_URI_TOPIC + "/create", o, false ); - } - - @Override - public void deleteTopic ( String topic ) throws HttpException, IOException - { - delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) ); - } - - @Override - public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException - { - return null == getAllowedProducers ( topic ); - } - - @Override - public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException - { - return getTopicMetadata ( topic ).getAllowedProducers (); - } - - @Override - public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() ); - } - - @Override - public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException - { - delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) ); - } - - @Override - public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException - { - return null == getAllowedConsumers ( topic ); - } - - @Override - public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException - { - return getTopicMetadata ( topic ).getAllowedConsumers (); - } - - @Override - public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() ); - } - - @Override - public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException - { - delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) ); - } - - @Override - public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException - { - try - { - final JSONObject o = new JSONObject (); - o.put ( PARAM_EMAIL, email ); - o.put ( PARAM_DESCRIPTION, description ); - final JSONObject reply = post ( BASE_URI_APIKEY + "/create", o, true ); - return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) ); - } - catch ( JSONException e ) - { - // the response doesn't meet our expectation - throw new MRApiException ( "The API key response is incomplete.", e ); - } - } - - @Override - public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException - { - final JSONObject keyEntry = get ( BASE_URI_APIKEY + "/" + MRConstants.escape ( apiKey ) ); - if ( keyEntry == null ) - { - return null; - } - - return new ApiKey () - { - @Override - public String getEmail () - { - final JSONObject aux = keyEntry.optJSONObject ( "aux" ); - if ( aux != null ) - { - return aux.optString ( PARAM_EMAIL ); - } - return null; - } - - @Override - public String getDescription () - { - final JSONObject aux = keyEntry.optJSONObject ( "aux" ); - if ( aux != null ) - { - return aux.optString ( PARAM_DESCRIPTION ); - } - return null; - } - }; - } - - @Override - public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException - { - final JSONObject o = new JSONObject (); - if ( email != null ) o.put ( PARAM_EMAIL, email ); - if ( description != null ) o.put ( PARAM_DESCRIPTION, description ); - patch ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ), o ); - } - - @Override - public void deleteCurrentApiKey () throws HttpException, IOException - { - delete ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ) ); - } +public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager { + private static final String BASE_URI_TOPIC = "/topics"; + private static final String BASE_URI_APIKEY = "/apiKeys"; + + private static final String PARAM_DESCRIPTION = "description"; + private static final String PARAM_EMAIL = "email"; + + private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class); + + public MRMetaClient(Collection<String> baseUrls) throws MalformedURLException { + super(baseUrls); + } + + @Override + public Set<String> getTopics() throws IOException { + final TreeSet<String> set = new TreeSet<>(); + try { + final JSONObject topicSet = get(BASE_URI_TOPIC); + final JSONArray a = topicSet.getJSONArray("topics"); + for (int i = 0; i < a.length(); i++) { + set.add(a.getString(i)); + } + } catch (HttpObjectNotFoundException e) { + getLog().warn("No /topics endpoint on service."); + logger.error("HttpObjectNotFoundException: ", e); + } catch (JSONException e) { + getLog().warn("Bad /topics result from service."); + logger.error("JSONException: ", e); + } catch (HttpException e) { + throw new IOException(e); + } + return set; + } + + @Override + public TopicInfo getTopicMetadata(String topic) throws HttpObjectNotFoundException, IOException { + try { + final JSONObject topicData = get(BASE_URI_TOPIC + "/" + MRConstants.escape(topic)); + return new TopicInfo() { + @Override + public String getOwner() { + return topicData.optString("owner", null); + } + + @Override + public String getDescription() { + return topicData.optString(PARAM_DESCRIPTION, null); + } + + @Override + public Set<String> getAllowedProducers() { + final JSONObject acl = topicData.optJSONObject("writerAcl"); + if (acl != null && acl.optBoolean("enabled", true)) { + return jsonArrayToSet(acl.optJSONArray("users")); + } + return null; + } + + @Override + public Set<String> getAllowedConsumers() { + final JSONObject acl = topicData.optJSONObject("readerAcl"); + if (acl != null && acl.optBoolean("enabled", true)) { + return jsonArrayToSet(acl.optJSONArray("users")); + } + return null; + } + }; + } catch (JSONException e) { + throw new IOException(e); + } catch (HttpException e) { + throw new IOException(e); + } + } + + @Override + public void createTopic(String topicName, String topicDescription, int partitionCount, int replicationCount) throws HttpException, IOException { + final JSONObject o = new JSONObject(); + o.put("topicName", topicName); + o.put("topicDescription", topicDescription); + o.put("partitionCount", partitionCount); + o.put("replicationCount", replicationCount); + post(BASE_URI_TOPIC + "/create", o, false); + } + + @Override + public void deleteTopic(String topic) throws HttpException, IOException { + delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic)); + } + + @Override + public boolean isOpenForProducing(String topic) throws HttpObjectNotFoundException, IOException { + return null == getAllowedProducers(topic); + } + + @Override + public Set<String> getAllowedProducers(String topic) throws HttpObjectNotFoundException, IOException { + return getTopicMetadata(topic).getAllowedProducers(); + } + + @Override + public void allowProducer(String topic, String apiKey) throws HttpObjectNotFoundException, HttpException, IOException { + put(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/producers/" + MRConstants.escape(apiKey), new JSONObject()); + } + + @Override + public void revokeProducer(String topic, String apiKey) throws HttpException, IOException { + delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/producers/" + MRConstants.escape(apiKey)); + } + + @Override + public boolean isOpenForConsuming(String topic) throws HttpObjectNotFoundException, IOException { + return null == getAllowedConsumers(topic); + } + + @Override + public Set<String> getAllowedConsumers(String topic) throws HttpObjectNotFoundException, IOException { + return getTopicMetadata(topic).getAllowedConsumers(); + } + + @Override + public void allowConsumer(String topic, String apiKey) throws HttpObjectNotFoundException, HttpException, IOException { + put(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/consumers/" + MRConstants.escape(apiKey), new JSONObject()); + } + + @Override + public void revokeConsumer(String topic, String apiKey) throws HttpException, IOException { + delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/consumers/" + MRConstants.escape(apiKey)); + } + + @Override + public ApiCredential createApiKey(String email, String description) throws HttpException, MRApiException, IOException { + try { + final JSONObject o = new JSONObject(); + o.put(PARAM_EMAIL, email); + o.put(PARAM_DESCRIPTION, description); + final JSONObject reply = post(BASE_URI_APIKEY + "/create", o, true); + return new ApiCredential(reply.getString("key"), reply.getString("secret")); + } catch (JSONException e) { + // the response doesn't meet our expectation + throw new MRApiException("The API key response is incomplete.", e); + } + } + + @Override + public ApiKey getApiKey(String apiKey) throws HttpObjectNotFoundException, HttpException, IOException { + final JSONObject keyEntry = get(BASE_URI_APIKEY + "/" + MRConstants.escape(apiKey)); + if (keyEntry == null) { + return null; + } + + return new ApiKey() { + @Override + public String getEmail() { + final JSONObject aux = keyEntry.optJSONObject("aux"); + if (aux != null) { + return aux.optString(PARAM_EMAIL); + } + return null; + } + + @Override + public String getDescription() { + final JSONObject aux = keyEntry.optJSONObject("aux"); + if (aux != null) { + return aux.optString(PARAM_DESCRIPTION); + } + return null; + } + }; + } + + @Override + public void updateCurrentApiKey(String email, String description) throws HttpObjectNotFoundException, HttpException, IOException { + final JSONObject o = new JSONObject(); + if (email != null) { + o.put(PARAM_EMAIL, email); + } + if (description != null) { + o.put(PARAM_DESCRIPTION, description); + } + patch(BASE_URI_APIKEY + "/" + MRConstants.escape(getCurrentApiKey()), o); + } + + @Override + public void deleteCurrentApiKey() throws HttpException, IOException { + delete(BASE_URI_APIKEY + "/" + MRConstants.escape(getCurrentApiKey())); + } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java index bd140cd..74fec8a 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -4,11 +4,13 @@ * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2021 Orange. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,10 +19,13 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ + package org.onap.dmaap.mr.client.impl; +import com.att.aft.dme2.api.DME2Client; +import com.att.aft.dme2.api.DME2Exception; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -33,342 +38,353 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; + import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; - import javax.ws.rs.core.MultivaluedMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.http.HttpException; import org.apache.http.HttpStatus; import org.json.JSONArray; import org.json.JSONObject; import org.json.JSONTokener; - -import com.att.aft.dme2.api.DME2Client; -import com.att.aft.dme2.api.DME2Exception; import org.onap.dmaap.mr.client.HostSelector; import org.onap.dmaap.mr.client.MRBatchingPublisher; +import org.onap.dmaap.mr.client.ProtocolType; import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { - private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); - - private static final String PROPS_PROTOCOL = "Protocol"; - private static final String PROPS_PARTITION = "partition"; - private static final String PROPS_CONTENT_TYPE = "contenttype"; - - private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip"; - private static final String CONTENT_TYPE_CAMBRIA = "application/cambria"; - private static final String CONTENT_TYPE_JSON = "application/json"; - private static final String CONTENT_TYPE_TEXT = "text/plain"; - - private static final String JSON_STATUS = "status"; - - public static class Builder { - - public Builder againstUrls(Collection<String> baseUrls) { - fUrls = baseUrls; - return this; - } - - public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype ) - { - fUrls = baseUrls; - fServiceName = serviceName; - fTransportype = transportype; - return this; - } - - public Builder onTopic(String topic) { - fTopic = topic; - return this; - } - - public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - return this; - } - - public Builder compress(boolean compress) { - fCompress = compress; - return this; - } - - public Builder httpThreadTime(int threadOccuranceTime) { - this.threadOccuranceTime = threadOccuranceTime; - return this; - } - - public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) { - fAllowSelfSignedCerts = allowSelfSignedCerts; - return this; - } - - public Builder withResponse(boolean withResponse) { - fWithResponse = withResponse; - return this; - } - - public MRSimplerBatchPublisher build() { - if (!fWithResponse) { - try { - return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, - fAllowSelfSignedCerts, threadOccuranceTime); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - } else { - try { - return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, - fAllowSelfSignedCerts, fMaxBatchSize); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); - } - } - - } - - private Collection<String> fUrls; - private Collection<String> fServiceName; - private String fTransportype; - private String fTopic; - private int fMaxBatchSize = 100; - - private long fMaxBatchAgeMs = 1000; - private boolean fCompress = false; - private int threadOccuranceTime = 50; - private boolean fAllowSelfSignedCerts = false; - private boolean fWithResponse = false; - - }; - - @Override - public int send(String partition, String msg) { - return send(new message(partition, msg)); - } - - @Override - public int send(String msg) { - return send(new message(null, msg)); - } - - @Override - public int send(message msg) { - final LinkedList<message> list = new LinkedList<>(); - list.add(msg); - return send(list); - } - - @Override - public synchronized int send(Collection<message> msgs) { - if (fClosed) { - throw new IllegalStateException("The publisher was closed."); - } - - for (message userMsg : msgs) { - fPending.add(new TimestampedMessage(userMsg)); - } - return getPendingMessageCount(); - } - - @Override - public synchronized int getPendingMessageCount() { - return fPending.size(); - } - - @Override - public void close() { - try { - final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (remains.isEmpty()) { - getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.", - remains.size()); - } - } catch (InterruptedException e) { - getLog().warn("Possible message loss. " + e.getMessage(), e); - Thread.currentThread().interrupt(); - } catch (IOException e) { - getLog().warn("Possible message loss. " + e.getMessage(), e); - } - } - - @Override - public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException { - synchronized (this) { - fClosed = true; - - // stop the background sender - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - fExec.shutdown(); - } - - final long now = Clock.now(); - final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); - final long timeoutAtMs = now + waitInMs; - - while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { - send(true); - Thread.sleep(250); - } - - synchronized (this) { - final LinkedList<message> result = new LinkedList<>(); - fPending.drainTo(result); - return result; - } - } - - /** - * Possibly send a batch to the MR server. This is called by the background - * thread and the close() method - * - * @param force - */ - private synchronized void send(boolean force) { - if ((force || shouldSendNow()) && !sendBatch()) { - getLog().warn("Send failed, " + fPending.size() + " message to send."); - - // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + Clock.now(); - } - } - - private synchronized boolean shouldSendNow() { - boolean shouldSend = false; - if (fPending.size()>0) { - final long nowMs = Clock.now(); - - shouldSend = (fPending.size() >= fMaxBatchSize); - if (!shouldSend) { - final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; - shouldSend = sendAtMs <= nowMs; - } - - // however, wait after an error - shouldSend = shouldSend && nowMs >= fDontSendUntilMs; - } - return shouldSend; - } - - /** - * Method to parse published JSON Objects and Arrays - * - * @return JSONArray - */ - private JSONArray parseJSON() { - JSONArray jsonArray = new JSONArray(); - for (TimestampedMessage m : fPending) { - JSONTokener jsonTokener = new JSONTokener(m.fMsg); - JSONObject jsonObject = null; - JSONArray tempjsonArray = null; - final char firstChar = jsonTokener.next(); - jsonTokener.back(); - if ('[' == firstChar) { - tempjsonArray = new JSONArray(jsonTokener); - if (null != tempjsonArray) { - for (int i = 0; i < tempjsonArray.length(); i++) { - jsonArray.put(tempjsonArray.getJSONObject(i)); - } - } - } else { - jsonObject = new JSONObject(jsonTokener); - jsonArray.put(jsonObject); - } - - } - return jsonArray; - } - - private void logTime(long startMs, String dmeResponse) { - if (getLog().isInfoEnabled()) { - getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse); - } - } - - private synchronized boolean sendBatch() { - // it's possible for this call to be made with an empty list. in this - // case, just return. - if (fPending.isEmpty()) { - return true; - } - - final long nowMs = Clock.now(); - - if (this.fHostSelector != null) { - host = this.fHostSelector.selectBaseHost(); - } - - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), - props.getProperty(PROPS_PARTITION)); - - try { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); - OutputStream os = baseStream; - final String contentType = props.getProperty(PROPS_CONTENT_TYPE); - if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { - JSONArray jsonArray = parseJSON(); - os.write(jsonArray.toString().getBytes()); - os.close(); - - } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { - for (TimestampedMessage m : fPending) { - os.write(m.fMsg.getBytes()); - os.write('\n'); - } - os.close(); - } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) - || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { - if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { - os = new GZIPOutputStream(baseStream); - } - for (TimestampedMessage m : fPending) { - - os.write(("" + m.fPartition.length()).getBytes()); - os.write('.'); - os.write(("" + m.fMsg.length()).getBytes()); - os.write('.'); - os.write(m.fPartition.getBytes()); - os.write(m.fMsg.getBytes()); - os.write('\n'); - } - os.close(); - } else { - for (TimestampedMessage m : fPending) { - os.write(m.fMsg.getBytes()); - - } - os.close(); - } - - final long startMs = Clock.now(); - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - DME2Configue(); - - this.wait(5); - getLog().info(String - .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath, - nowMs - fPending.peek().timestamp)); - sender.setPayload(os.toString()); - String dmeResponse = sender.sendAndWait(5000L); - - logTime(startMs, dmeResponse); - fPending.clear(); - return true; - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); + private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + + private static final String PASSWORD = "password"; + private static final String USERNAME = "username"; + private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath"; + private static final String SERVICE_NAME = "ServiceName"; + private static final String PARTNER = "Partner"; + private static final String ROUTE_OFFER = "routeOffer"; + private static final String PROTOCOL = "Protocol"; + private static final String METHOD_TYPE = "MethodType"; + private static final String CONTENT_TYPE = "contenttype"; + private static final String LATITUDE = "Latitude"; + private static final String LONGITUDE = "Longitude"; + private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT"; + private static final String VERSION = "Version"; + private static final String ENVIRONMENT = "Environment"; + private static final String SUB_CONTEXT_PATH = "SubContextPath"; + private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired"; + private static final String PARTITION = "partition"; + private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS"; + private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT"; + private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS"; + private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS"; + private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON"; + + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String JSON_STATUS = "status"; + + public static class Builder { + + public Builder againstUrls(Collection<String> baseUrls) { + fUrls = baseUrls; + return this; + } + + public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) { + fUrls = baseUrls; + return this; + } + + public Builder onTopic(String topic) { + fTopic = topic; + return this; + } + + public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + return this; + } + + public Builder compress(boolean compress) { + fCompress = compress; + return this; + } + + public Builder httpThreadTime(int threadOccurrenceTime) { + this.threadOccurrenceTime = threadOccurrenceTime; + return this; + } + + public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) { + fAllowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public Builder withResponse(boolean withResponse) { + fWithResponse = withResponse; + return this; + } + + public MRSimplerBatchPublisher build() { + if (!fWithResponse) { + try { + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, threadOccurrenceTime); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } else { + try { + return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, + fAllowSelfSignedCerts, fMaxBatchSize); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + + } + + private Collection<String> fUrls; + private String fTopic; + private int fMaxBatchSize = 100; + + private long fMaxBatchAgeMs = 1000; + private boolean fCompress = false; + private int threadOccurrenceTime = 50; + private boolean fAllowSelfSignedCerts = false; + private boolean fWithResponse = false; + + } + + @Override + public int send(String partition, String msg) { + return send(new Message(partition, msg)); + } + + @Override + public int send(String msg) { + return send(new Message(null, msg)); + } + + @Override + public int send(Message msg) { + final LinkedList<Message> list = new LinkedList<>(); + list.add(msg); + return send(list); + } + + @Override + public synchronized int send(Collection<Message> msgs) { + if (fClosed) { + throw new IllegalStateException("The publisher was closed."); + } + + for (Message userMsg : msgs) { + fPending.add(new TimestampedMessage(userMsg)); + } + return getPendingMessageCount(); + } + + @Override + public synchronized int getPendingMessageCount() { + return fPending.size(); + } + + @Override + public void close() { + try { + final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.isEmpty()) { + getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.", + remains.size()); + } + } catch (InterruptedException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + Thread.currentThread().interrupt(); + } catch (IOException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } + } + + @Override + public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException { + synchronized (this) { + fClosed = true; + + // stop the background sender + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); + } + + final long now = Clock.now(); + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); + final long timeoutAtMs = now + waitInMs; + + while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { + send(true); + Thread.sleep(250); + } + + synchronized (this) { + final LinkedList<Message> result = new LinkedList<>(); + fPending.drainTo(result); + return result; + } + } + + /** + * Possibly send a batch to the MR server. This is called by the background + * thread and the close() method + * + * @param force + */ + private synchronized void send(boolean force) { + if ((force || shouldSendNow()) && !sendBatch()) { + getLog().warn("Send failed, {} message to send.", fPending.size()); + // note the time for back-off + fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now(); + } + } + + private synchronized boolean shouldSendNow() { + boolean shouldSend = false; + if (!fPending.isEmpty()) { + final long nowMs = Clock.now(); + + shouldSend = (fPending.size() >= fMaxBatchSize); + if (!shouldSend) { + final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, wait after an error + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + } + return shouldSend; + } + + /** + * Method to parse published JSON Objects and Arrays. + * + * @return JSONArray + */ + private JSONArray parseJSON() { + JSONArray jsonArray = new JSONArray(); + for (TimestampedMessage m : fPending) { + JSONTokener jsonTokener = new JSONTokener(m.fMsg); + JSONObject jsonObject = null; + JSONArray tempjsonArray = null; + final char firstChar = jsonTokener.next(); + jsonTokener.back(); + if ('[' == firstChar) { + tempjsonArray = new JSONArray(jsonTokener); + for (int i = 0; i < tempjsonArray.length(); i++) { + jsonArray.put(tempjsonArray.getJSONObject(i)); + } + } else { + jsonObject = new JSONObject(jsonTokener); + jsonArray.put(jsonObject); + } + + } + return jsonArray; + } + + private void logTime(long startMs, String dmeResponse) { + if (getLog().isInfoEnabled()) { + getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse); + } + } + + private void logSendMessage(int nbMessage, String dest, long time) { + if (getLog().isInfoEnabled()) { + getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time); + } + } + + private synchronized boolean sendBatch() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.isEmpty()) { + return true; + } + + final long nowMs = Clock.now(); + + if (this.fHostSelector != null) { + host = this.fHostSelector.selectBaseHost(); + } + + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL), + props.getProperty(PARTITION)); + + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + OutputStream os = baseStream; + final String contentType = props.getProperty(CONTENT_TYPE); + if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + os.close(); + + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString()) + || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) { + if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + + } + os.close(); + } + + final long startMs = Clock.now(); + if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + configureDME2(); + + this.wait(5); + if (fPending.peek() != null) { + logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp); + } + sender.setPayload(os.toString()); + String dmeResponse = sender.sendAndWait(5000L); + + logTime(startMs, dmeResponse); + fPending.clear(); + return true; + } + + if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp); + } final JSONObject result = postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray()) .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate) @@ -379,550 +395,563 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - logTime(startMs, result.toString()); + logTime(startMs, result.toString()); fPending.clear(); return true; } - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); - final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, - protocolFlag); - - // Here we are checking for error response. If HTTP status - // code is not within the http success response code - // then we consider this as error and return false - if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { - return false; - } - logTime(startMs, result.toString()); - fPending.clear(); - return true; - } - - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); - final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType); - - // Here we are checking for error response. If HTTP status - // code is not within the http success response code - // then we consider this as error and return false - if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { - return false; - } - logTime(startMs, result.toString()); - fPending.clear(); - return true; - } - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - } - return false; - } - - public synchronized MRPublisherResponse sendBatchWithResponse() { - // it's possible for this call to be made with an empty list. in this - // case, just return. - if (fPending.isEmpty()) { - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage("No Messages to send"); - return pubResponse; - } - - final long nowMs = Clock.now(); - - host = this.fHostSelector.selectBaseHost(); - - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), - props.getProperty(PROPS_PARTITION)); - OutputStream os = null; - try { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); - os = baseStream; - final String contentType = props.getProperty(PROPS_CONTENT_TYPE); - if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { - JSONArray jsonArray = parseJSON(); - os.write(jsonArray.toString().getBytes()); - } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { - for (TimestampedMessage m : fPending) { - os.write(m.fMsg.getBytes()); - os.write('\n'); - } - } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) - || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { - if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { - os = new GZIPOutputStream(baseStream); - } - for (TimestampedMessage m : fPending) { - - os.write(("" + m.fPartition.length()).getBytes()); - os.write('.'); - os.write(("" + m.fMsg.length()).getBytes()); - os.write('.'); - os.write(m.fPartition.getBytes()); - os.write(m.fMsg.getBytes()); - os.write('\n'); - } - os.close(); - } else { - for (TimestampedMessage m : fPending) { - os.write(m.fMsg.getBytes()); - - } - } - - final long startMs = Clock.now(); - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - - try { - DME2Configue(); - - this.wait(5); - getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath, - nowMs - fPending.peek().timestamp); - sender.setPayload(os.toString()); - - String dmeResponse = sender.sendAndWait(5000L); - - pubResponse = createMRPublisherResponse(dmeResponse, pubResponse); - - if (Integer.valueOf(pubResponse.getResponseCode()) < 200 - || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString(); - getLog().info(logLine); - fPending.clear(); - - } catch (DME2Exception x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(x.getErrorCode()); - pubResponse.setResponseMessage(x.getErrorMessage()); - } catch (URISyntaxException x) { - - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - } catch (Exception x) { - - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - logger.error("exception: ", x); - - } - - return pubResponse; - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); - final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, - authDate, username, password, protocolFlag); - // Here we are checking for error response. If HTTP status - // code is not within the http success response code - // then we consider this as error and return false - - pubResponse = createMRPublisherResponse(result, pubResponse); - - if (Integer.valueOf(pubResponse.getResponseCode()) < 200 - || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - - logTime(startMs, result); - fPending.clear(); - return pubResponse; - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); - final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, - password, protocolFlag); - - // Here we are checking for error response. If HTTP status - // code is not within the http success response code - // then we consider this as error and return false - pubResponse = createMRPublisherResponse(result, pubResponse); - - if (Integer.valueOf(pubResponse.getResponseCode()) < 200 - || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - - final String logLine = String.valueOf((Clock.now() - startMs)); - getLog().info(logLine); - fPending.clear(); - return pubResponse; - } - - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, - nowMs - fPending.peek().timestamp); - final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType); - - // Here we are checking for error response. If HTTP status - // code is not within the http success response code - // then we consider this as error and return false - pubResponse = createMRPublisherResponse(result, pubResponse); - - if (Integer.valueOf(pubResponse.getResponseCode()) < 200 - || Integer.valueOf(pubResponse.getResponseCode()) > 299) { - - return pubResponse; - } - - final String logLine = String.valueOf((Clock.now() - startMs)); - getLog().info(logLine); - fPending.clear(); - return pubResponse; - } - } catch (IllegalArgumentException | HttpException x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - - } catch (IOException x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage(x.getMessage()); - - } - - finally { - if (!fPending.isEmpty()) { - getLog().warn("Send failed, " + fPending.size() + " message to send."); - pubResponse.setPendingMsgs(fPending.size()); - } - if (os != null) { - try { - os.close(); - } catch (Exception x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); - pubResponse.setResponseMessage("Error in closing Output Stream"); - } - } - } - - return pubResponse; - } - - public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { - - if (reply.isEmpty()) { - - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - mrPubResponse.setResponseMessage("Please verify the Producer properties"); - } else if (reply.startsWith("{")) { - JSONObject jObject = new JSONObject(reply); - if (jObject.has("message") && jObject.has(JSON_STATUS)) { - String message = jObject.getString("message"); - if (null != message) { - mrPubResponse.setResponseMessage(message); - } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); - } else { - mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); - mrPubResponse.setResponseMessage(reply); - } - } else if (reply.startsWith("<")) { - String responseCode = getHTTPErrorResponseCode(reply); - if (responseCode.contains("403")) { - responseCode = "403"; - } - mrPubResponse.setResponseCode(responseCode); - mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); - } - - return mrPubResponse; - } - - private final String fTopic; - private final int fMaxBatchSize; - private final long fMaxBatchAgeMs; - private final boolean fCompress; - private int threadOccuranceTime; - private boolean fClosed; - private String username; - private String password; - private String host; - - // host selector - private HostSelector fHostSelector = null; - - private final LinkedBlockingQueue<TimestampedMessage> fPending; - private long fDontSendUntilMs; - private final ScheduledThreadPoolExecutor fExec; - - private String latitude; - private String longitude; - private String version; - private String serviceName; - private String env; - private String partner; - private String routeOffer; - private String subContextPath; - private String protocol; - private String methodType; - private String url; - private String dmeuser; - private String dmepassword; - private String contentType; - private static final long sfWaitAfterError = 10000; - private HashMap<String, String> DMETimeOuts; - private DME2Client sender; - public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); - private String authKey; - private String authDate; - private String handlers; - private Properties props; - public static String routerFilePath; - protected static final Map<String, String> headers = new HashMap<String, String>(); - public static MultivaluedMap<String, Object> headersMap; - - private MRPublisherResponse pubResponse; - - public MRPublisherResponse getPubResponse() { - return pubResponse; - } - - public void setPubResponse(MRPublisherResponse pubResponse) { - this.pubResponse = pubResponse; - } - - public static String getRouterFilePath() { - return routerFilePath; - } - - public static void setRouterFilePath(String routerFilePath) { - MRSimplerBatchPublisher.routerFilePath = routerFilePath; - } - - public Properties getProps() { - return props; - } - - public void setProps(Properties props) { - this.props = props; - setClientConfig(DmaapClientUtil.getClientConfig(props)); - } - - public String getProtocolFlag() { - return protocolFlag; - } - - public void setProtocolFlag(String protocolFlag) { - this.protocolFlag = protocolFlag; - } - - private void DME2Configue() throws Exception { - try { - - latitude = props.getProperty("Latitude"); - longitude = props.getProperty("Longitude"); - version = props.getProperty("Version"); - serviceName = props.getProperty("ServiceName"); - env = props.getProperty("Environment"); - partner = props.getProperty("Partner"); - routeOffer = props.getProperty("routeOffer"); - subContextPath = props.getProperty("SubContextPath") + fTopic; - - protocol = props.getProperty(PROPS_PROTOCOL); - methodType = props.getProperty("MethodType"); - dmeuser = props.getProperty("username"); - dmepassword = props.getProperty("password"); - contentType = props.getProperty(PROPS_CONTENT_TYPE); - handlers = props.getProperty("sessionstickinessrequired"); - routerFilePath = props.getProperty("DME2preferredRouterFilePath"); - - /** - * Changes to DME2Client url to use Partner for auto failover - * between data centers When Partner value is not provided use the - * routeOffer value for auto failover within a cluster - */ - - String partitionKey = props.getProperty(PROPS_PARTITION); - - if (partner != null && !partner.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" - + partner; - if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { - url = url + "&partitionKey=" + partitionKey; - } - } else if (routeOffer != null && !routeOffer.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" - + routeOffer; - if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { - url = url + "&partitionKey=" + partitionKey; - } - } - - DMETimeOuts = new HashMap<String, String>(); - DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS")); - DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); - DMETimeOuts.put("Content-Type", contentType); - System.setProperty("AFT_LATITUDE", latitude); - System.setProperty("AFT_LONGITUDE", longitude); - System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT")); - // System.setProperty("DME2.DEBUG", "true"); - - // SSL changes - // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", - - System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); - System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); - System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); - - // SSL changes - - sender = new DME2Client(new URI(url), 5000L); - - sender.setAllowAllHttpReturnCodes(true); - sender.setMethod(methodType); - sender.setSubContext(subContextPath); - sender.setCredentials(dmeuser, dmepassword); - sender.setHeaders(DMETimeOuts); - if (handlers != null &&handlers.equalsIgnoreCase("yes")) { - sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", - props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", - props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); - sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON")); - } else { - sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); - } - } catch (DME2Exception x) { - getLog().warn(x.getMessage(), x); - throw new DME2Exception(x.getErrorCode(), x.getErrorMessage()); - } catch (URISyntaxException x) { - - getLog().warn(x.getMessage(), x); - throw new URISyntaxException(url, x.getMessage()); - } catch (Exception x) { - - getLog().warn(x.getMessage(), x); - throw new IllegalArgumentException(x.getMessage()); - } - } - - private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, - boolean compress) throws MalformedURLException { - super(hosts); - - if (topic == null || topic.length() < 1) { - throw new IllegalArgumentException("A topic must be provided."); - } - - fHostSelector = new HostSelector(hosts, null); - fClosed = false; - fTopic = topic; - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - fCompress = compress; - - fPending = new LinkedBlockingQueue<>(); - fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor(1); - pubResponse = new MRPublisherResponse(); - - } - - private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, - boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException { - super(hosts); - - if (topic == null || topic.length() < 1) { - throw new IllegalArgumentException("A topic must be provided."); - } - - fHostSelector = new HostSelector(hosts, null); - fClosed = false; - fTopic = topic; - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - fCompress = compress; - threadOccuranceTime = httpThreadOccurnace; - fPending = new LinkedBlockingQueue<>(); - fDontSendUntilMs = 0; - fExec = new ScheduledThreadPoolExecutor(1); - fExec.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - send(false); - } - }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS); - pubResponse = new MRPublisherResponse(); - } - - private static class TimestampedMessage extends message { - public TimestampedMessage(message m) { - super(m); - timestamp = Clock.now(); - } - - public final long timestamp; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getContentType() { - return contentType; - } - - public void setContentType(String contentType) { - this.contentType = contentType; - } - - public String getAuthKey() { - return authKey; - } - - public void setAuthKey(String authKey) { - this.authKey = authKey; - } - - public String getAuthDate() { - return authDate; - } - - public void setAuthDate(String authDate) { - this.authDate = authDate; - } + if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp); + } + final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, + protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { + return false; + } + logTime(startMs, result.toString()); + fPending.clear(); + return true; + } + + if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp); + } + final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { + return false; + } + logTime(startMs, result.toString()); + fPending.clear(); + return true; + } + } catch (InterruptedException e) { + getLog().warn("Interrupted!", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + } + return false; + } + + public synchronized MRPublisherResponse sendBatchWithResponse() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.isEmpty()) { + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage("No Messages to send"); + return pubResponse; + } + + final long nowMs = Clock.now(); + + host = this.fHostSelector.selectBaseHost(); + + final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL), + props.getProperty(PARTITION)); + OutputStream os = null; + try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) { + os = baseStream; + final String propsContentType = props.getProperty(CONTENT_TYPE); + if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) { + JSONArray jsonArray = parseJSON(); + os.write(jsonArray.toString().getBytes()); + } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString()) + || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) { + if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + } else { + for (TimestampedMessage m : fPending) { + os.write(m.fMsg.getBytes()); + + } + } + + final long startMs = Clock.now(); + if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) { + + try { + configureDME2(); + + this.wait(5); + + if (fPending.peek() != null) { + logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp); + } + sender.setPayload(os.toString()); + + String dmeResponse = sender.sendAndWait(5000L); + + pubResponse = createMRPublisherResponse(dmeResponse, pubResponse); + + if (Integer.parseInt(pubResponse.getResponseCode()) < 200 + || Integer.parseInt(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString(); + getLog().info(logLine); + fPending.clear(); + + } catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(x.getErrorCode()); + pubResponse.setResponseMessage(x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + } catch (InterruptedException e) { + throw e; + } catch (Exception x) { + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + logger.error("exception: ", x); + + } + + return pubResponse; + } + + if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp); + } + final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey, + authDate, username, password, protocolFlag); + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.parseInt(pubResponse.getResponseCode()) < 200 + || Integer.parseInt(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + logTime(startMs, result); + fPending.clear(); + return pubResponse; + } + + if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp); + } + final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username, + password, protocolFlag); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.parseInt(pubResponse.getResponseCode()) < 200 + || Integer.parseInt(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = String.valueOf((Clock.now() - startMs)); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + + if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { + if (fPending.peek() != null) { + logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp); + } + final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType); + + // Here we are checking for error response. If HTTP status + // code is not within the http success response code + // then we consider this as error and return false + pubResponse = createMRPublisherResponse(result, pubResponse); + + if (Integer.parseInt(pubResponse.getResponseCode()) < 200 + || Integer.parseInt(pubResponse.getResponseCode()) > 299) { + + return pubResponse; + } + + final String logLine = String.valueOf((Clock.now() - startMs)); + getLog().info(logLine); + fPending.clear(); + return pubResponse; + } + } catch (IllegalArgumentException | HttpException x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + pubResponse.setResponseMessage(x.getMessage()); + + } catch (IOException x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + } catch (InterruptedException e) { + getLog().warn("Interrupted!", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage(x.getMessage()); + + } finally { + if (!fPending.isEmpty()) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); + pubResponse.setPendingMsgs(fPending.size()); + } + if (os != null) { + try { + os.close(); + } catch (Exception x) { + getLog().warn(x.getMessage(), x); + pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + pubResponse.setResponseMessage("Error in closing Output Stream"); + } + } + } + + return pubResponse; + } + + public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) { + + if (reply.isEmpty()) { + + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); + mrPubResponse.setResponseMessage("Please verify the Producer properties"); + } else if (reply.startsWith("{")) { + JSONObject jObject = new JSONObject(reply); + if (jObject.has("message") && jObject.has(JSON_STATUS)) { + String message = jObject.getString("message"); + if (null != message) { + mrPubResponse.setResponseMessage(message); + } + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); + } else { + mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); + mrPubResponse.setResponseMessage(reply); + } + } else if (reply.startsWith("<")) { + String responseCode = getHTTPErrorResponseCode(reply); + if (responseCode.contains("403")) { + responseCode = "403"; + } + mrPubResponse.setResponseCode(responseCode); + mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply)); + } + + return mrPubResponse; + } + + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxBatchAgeMs; + private final boolean fCompress; + private int threadOccurrenceTime; + private boolean fClosed; + private String username; + private String password; + private String host; + + // host selector + private HostSelector fHostSelector = null; + + private final LinkedBlockingQueue<TimestampedMessage> fPending; + private long fDontSendUntilMs; + private final ScheduledThreadPoolExecutor fExec; + + private String latitude; + private String longitude; + private String version; + private String serviceName; + private String env; + private String partner; + private String routeOffer; + private String subContextPath; + private String protocol; + private String methodType; + private String url; + private String dmeuser; + private String dmepassword; + private String contentType; + private static final long SF_WAIT_AFTER_ERROR = 10000; + private HashMap<String, String> DMETimeOuts; + private DME2Client sender; + public String protocolFlag = ProtocolType.DME2.getValue(); + private String authKey; + private String authDate; + private String handlers; + private Properties props; + public static String routerFilePath; + protected static final Map<String, String> headers = new HashMap<String, String>(); + public static MultivaluedMap<String, Object> headersMap; + + private MRPublisherResponse pubResponse; + + public MRPublisherResponse getPubResponse() { + return pubResponse; + } + + public void setPubResponse(MRPublisherResponse pubResponse) { + this.pubResponse = pubResponse; + } + + public static String getRouterFilePath() { + return routerFilePath; + } + + public static void setRouterFilePath(String routerFilePath) { + MRSimplerBatchPublisher.routerFilePath = routerFilePath; + } + + public Properties getProps() { + return props; + } + + public void setProps(Properties props) { + this.props = props; + setClientConfig(DmaapClientUtil.getClientConfig(props)); + } + + public String getProtocolFlag() { + return protocolFlag; + } + + public void setProtocolFlag(String protocolFlag) { + this.protocolFlag = protocolFlag; + } + + private void configureDME2() throws Exception { + try { + + latitude = props.getProperty(LATITUDE); + longitude = props.getProperty(LONGITUDE); + version = props.getProperty(VERSION); + serviceName = props.getProperty(SERVICE_NAME); + env = props.getProperty(ENVIRONMENT); + partner = props.getProperty(PARTNER); + routeOffer = props.getProperty(ROUTE_OFFER); + subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic; + + protocol = props.getProperty(PROTOCOL); + methodType = props.getProperty(METHOD_TYPE); + dmeuser = props.getProperty(USERNAME); + dmepassword = props.getProperty(PASSWORD); + contentType = props.getProperty(CONTENT_TYPE); + handlers = props.getProperty(SESSION_STICKINESS_REQUIRED); + + MRSimplerBatchPublisher.routerFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); + + /* + * Changes to DME2Client url to use Partner for auto failover + * between data centers When Partner value is not provided use the + * routeOffer value for auto failover within a cluster + */ + + String partitionKey = props.getProperty(PARTITION); + + if (partner != null && !partner.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + + partner; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } else if (routeOffer != null && !routeOffer.isEmpty()) { + url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" + + routeOffer; + if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) { + url = url + "&partitionKey=" + partitionKey; + } + } + + DMETimeOuts = new HashMap<>(); + DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS)); + DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS)); + DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT)); + DMETimeOuts.put("Content-Type", contentType); + System.setProperty("AFT_LATITUDE", latitude); + System.setProperty("AFT_LONGITUDE", longitude); + System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT)); + // System.setProperty("DME2.DEBUG", "true"); + + // SSL changes + // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", + + System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2"); + System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false"); + System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); + + // SSL changes + + sender = new DME2Client(new URI(url), 5000L); + + sender.setAllowAllHttpReturnCodes(true); + sender.setMethod(methodType); + sender.setSubContext(subContextPath); + sender.setCredentials(dmeuser, dmepassword); + sender.setHeaders(DMETimeOuts); + if ("yes".equalsIgnoreCase(handlers)) { + sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", + props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS)); + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", + props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS)); + sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON)); + } else { + sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler"); + } + } catch (DME2Exception x) { + getLog().warn(x.getMessage(), x); + throw new DME2Exception(x.getErrorCode(), x.getErrorMessage()); + } catch (URISyntaxException x) { + + getLog().warn(x.getMessage(), x); + throw new URISyntaxException(url, x.getMessage()); + } catch (Exception x) { + + getLog().warn(x.getMessage(), x); + throw new IllegalArgumentException(x.getMessage()); + } + } + + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + + fPending = new LinkedBlockingQueue<>(); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor(1); + pubResponse = new MRPublisherResponse(); + + } + + private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, + boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException { + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); + } + + fHostSelector = new HostSelector(hosts, null); + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + threadOccurrenceTime = httpThreadOccurrence; + fPending = new LinkedBlockingQueue<>(); + fDontSendUntilMs = 0; + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + send(false); + } + }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS); + pubResponse = new MRPublisherResponse(); + } + + private static class TimestampedMessage extends Message { + public TimestampedMessage(Message message) { + super(message); + timestamp = Clock.now(); + } + + public final long timestamp; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getAuthKey() { + return authKey; + } + + public void setAuthKey(String authKey) { + this.authKey = authKey; + } + + public String getAuthDate() { + return authDate; + } + + public void setAuthDate(String authDate) { + this.authDate = authDate; + } } |