aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client/impl
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/Clock.java70
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java138
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java722
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java807
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java50
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java295
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java168
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java61
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java436
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java1757
10 files changed, 2229 insertions, 2275 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java
index 6670399..64a9f5e 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/Clock.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,47 +19,43 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
-public class Clock
-{
- public synchronized static Clock getIt ()
- {
- if ( sfClock == null )
- {
- sfClock = new Clock ();
- }
- return sfClock;
- }
+public class Clock {
+ public static synchronized Clock getIt() {
+ if (sfClock == null) {
+ sfClock = new Clock();
+ }
+ return sfClock;
+ }
- /**
- * Get the system's current time in milliseconds.
- * @return the current time
- */
- public static long now ()
- {
- return getIt().nowImpl ();
- }
+ /**
+ * Get the system's current time in milliseconds.
+ *
+ * @return the current time
+ */
+ public static long now() {
+ return getIt().nowImpl();
+ }
- /**
- * Get current time in milliseconds
- * @return current time in ms
- */
- protected long nowImpl ()
- {
- return System.currentTimeMillis ();
- }
+ /**
+ * Get current time in milliseconds.
+ *
+ * @return current time in ms
+ */
+ protected long nowImpl() {
+ return System.currentTimeMillis();
+ }
- protected Clock ()
- {
- }
+ protected Clock() {
+ }
- private static Clock sfClock = null;
+ private static Clock sfClock = null;
- protected synchronized static void register ( Clock testClock )
- {
- sfClock = testClock;
- }
+ protected static synchronized void register(Clock testClock) {
+ sfClock = testClock;
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java
index 948ca31..5219286 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/DmaapClientUtil.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -19,6 +21,7 @@
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
import java.util.Properties;
@@ -27,6 +30,7 @@ import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
+
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
@@ -34,92 +38,96 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
public class DmaapClientUtil {
- private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
- private static final String MR_DATE_CONSTANT = "X-CambriaDate";
- private static final String[] httpClientProperties = { ClientProperties.CONNECT_TIMEOUT,
- ClientProperties.READ_TIMEOUT, ClientProperties.PROXY_USERNAME, ClientProperties.PROXY_PASSWORD,
- ClientProperties.PROXY_URI };
+ private DmaapClientUtil() {
+
+ }
+
+ private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
+ private static final String MR_DATE_CONSTANT = "X-CambriaDate";
+ private static final String[] httpClientProperties = {ClientProperties.CONNECT_TIMEOUT,
+ ClientProperties.READ_TIMEOUT, ClientProperties.PROXY_USERNAME, ClientProperties.PROXY_PASSWORD,
+ ClientProperties.PROXY_URI};
- public static ClientConfig getClientConfig(Properties properties) {
- ClientConfig config = new ClientConfig();
- if (properties != null && !properties.isEmpty()) {
- setHttpClientProperties(config, properties);
- }
- return config;
- }
+ public static ClientConfig getClientConfig(Properties properties) {
+ ClientConfig config = new ClientConfig();
+ if (properties != null && !properties.isEmpty()) {
+ setHttpClientProperties(config, properties);
+ }
+ return config;
+ }
- private static void setHttpClientProperties(ClientConfig config, Properties properties) {
- for (int i = 0; i < httpClientProperties.length; i++) {
- if ((properties.getProperty(httpClientProperties[i]) != null)) {
- config.property(httpClientProperties[i], properties.getProperty(httpClientProperties[i]));
- }
- }
- if ((properties.getProperty(ClientProperties.PROXY_URI) != null) &&
- !(properties.getProperty(ClientProperties.PROXY_URI).isEmpty())) {
- config.connectorProvider(new ApacheConnectorProvider());
- } // else the default connectorProvider (HttpConnectorProvider) will be used
+ private static void setHttpClientProperties(ClientConfig config, Properties properties) {
+ for (String httpClientProperty : httpClientProperties) {
+ if ((properties.getProperty(httpClientProperty) != null)) {
+ config.property(httpClientProperty, properties.getProperty(httpClientProperty));
+ }
+ }
+ if ((properties.getProperty(ClientProperties.PROXY_URI) != null)
+ && !(properties.getProperty(ClientProperties.PROXY_URI).isEmpty())) {
+ config.connectorProvider(new ApacheConnectorProvider());
+ } // else the default connectorProvider (HttpConnectorProvider) will be used
- }
+ }
- public static WebTarget getTarget(ClientConfig config, final String path, final String username,
- final String password) {
- Client client = null;
- if (config != null) {
- client = ClientBuilder.newClient(config);
- } else {
- client = ClientBuilder.newClient();
- }
- HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
- client.register(feature);
+ public static WebTarget getTarget(ClientConfig config, final String path, final String username,
+ final String password) {
+ Client client = null;
+ if (config != null) {
+ client = ClientBuilder.newClient(config);
+ } else {
+ client = ClientBuilder.newClient();
+ }
+ HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+ client.register(feature);
- return client.target(path);
- }
+ return client.target(path);
+ }
- public static WebTarget getTarget(ClientConfig config, final String path) {
+ public static WebTarget getTarget(ClientConfig config, final String path) {
- Client client = null;
- if (config != null&&config.getProperties().size()>0) {
- client = ClientBuilder.newClient(config);
- } else {
- client = ClientBuilder.newClient();
- }
- return client.target(path);
- }
+ Client client = null;
+ if (config != null && config.getProperties().size() > 0) {
+ client = ClientBuilder.newClient(config);
+ } else {
+ client = ClientBuilder.newClient();
+ }
+ return client.target(path);
+ }
- public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) {
- return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
+ public static Response getResponsewtCambriaAuth(WebTarget target, String username, String password) {
+ return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
- }
+ }
- public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password, byte[] data,
- String contentType) {
- return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password)
- .post(Entity.entity(data, contentType));
+ public static Response postResponsewtCambriaAuth(WebTarget target, String username, String password, byte[] data,
+ String contentType) {
+ return target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password)
+ .post(Entity.entity(data, contentType));
- }
+ }
- public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) {
+ public static Response getResponsewtBasicAuth(WebTarget target, String authHeader) {
- return target.request().header("Authorization", "Basic " + authHeader).get();
+ return target.request().header("Authorization", "Basic " + authHeader).get();
- }
+ }
- public static Response postResponsewtBasicAuth(WebTarget target, String authHeader, byte[] data,
- String contentType) {
+ public static Response postResponsewtBasicAuth(WebTarget target, String authHeader, byte[] data,
+ String contentType) {
- return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType));
+ return target.request().header("Authorization", "Basic " + authHeader).post(Entity.entity(data, contentType));
- }
+ }
- public static Response getResponsewtNoAuth(WebTarget target) {
+ public static Response getResponsewtNoAuth(WebTarget target) {
- return target.request().get();
+ return target.request().get();
- }
+ }
- public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) {
- return target.request().post(Entity.entity(data, contentType));
+ public static Response postResponsewtNoAuth(WebTarget target, byte[] data, String contentType) {
+ return target.request().post(Entity.entity(data, contentType));
- }
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
index 07cf6a7..9522c90 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,387 +19,387 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
+import com.att.nsa.apiClient.http.CacheUse;
+import com.att.nsa.apiClient.http.HttpClient;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.internal.util.Base64;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.http.CacheUse;
-import com.att.nsa.apiClient.http.HttpClient;
import org.onap.dmaap.mr.client.MRClient;
import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.onap.dmaap.mr.client.ProtocolType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MRBaseClient extends HttpClient implements MRClient {
- private final static String HEADER_TRANSACTION_ID = "transactionid";
-
- private final static String JSON_RESULT = "result";
- private final static String JSON_STATUS = "status";
-
- private final static String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.";
- private final static String LOG_TRANSACTION_ID = "TransactionId : ";
-
- private ClientConfig clientConfig = null;
-
- protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, stdSvcPort);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
- super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L,
- TimeUnit.MILLISECONDS, 32, 32, 600000);
-
- fLog = LoggerFactory.getLogger(this.getClass().getName());
- }
-
- public ClientConfig getClientConfig1() {
- return clientConfig;
- }
-
- public void setClientConfig(ClientConfig config) {
- this.clientConfig = config;
- }
-
- @Override
- public void close() {
- // nothing to close
- }
-
- protected Set<String> jsonArrayToSet(JSONArray a) {
- if (a == null)
- return null;
-
- final TreeSet<String> set = new TreeSet<>();
- for (int i = 0; i < a.length(); i++) {
- set.add(a.getString(i));
- }
- return set;
- }
-
- public void logTo(Logger log) {
- fLog = log;
- replaceLogger(log);
- }
-
- protected Logger getLog() {
- return fLog;
- }
-
- private Logger fLog;
-
- public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
- final String password, final String protocalFlag) throws HttpException, JSONException {
- if ((null != username && null != password)) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType);
-
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public JSONObject postNoAuth(final String path, final byte[] data, String contentType)
- throws HttpException, JSONException {
- WebTarget target = null;
- Response response = null;
- if (contentType == null) {
- contentType = "text/pain";
- }
- target = DmaapClientUtil.getTarget(clientConfig,path);
-
- response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
-
- return getResponseDataInJson(response);
- }
-
- public String postWithResponse(final String path, final byte[] data, final String contentType,
- final String username, final String password, final String protocolFlag)
- throws HttpException, JSONException {
- String responseData = null;
- if ((null != username && null != password)) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType);
-
- responseData = (String) response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public String postNoAuthWithResponse(final String path, final byte[] data, String contentType)
- throws HttpException, JSONException {
-
- String responseData = null;
- WebTarget target = null;
- Response response = null;
- if (contentType == null) {
- contentType = "text/pain";
- }
- target = DmaapClientUtil.getTarget(clientConfig,path);
-
- response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
- responseData = (String) response.readEntity(String.class);
- return responseData;
- }
-
- public JSONObject postAuth(PostAuthDataObject postAuthDO) throws HttpException, JSONException {
- if ((null != postAuthDO.getUsername() && null != postAuthDO.getPassword())) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,postAuthDO.getPath(), postAuthDO.getUsername(),
- postAuthDO.getPassword());
- response = DmaapClientUtil.postResponsewtCambriaAuth(target, postAuthDO.getAuthKey(),
- postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType());
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
- final String authKey, final String authDate, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if ((null != username && null != password)) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
- responseData = (String) response.readEntity(String.class);
- return responseData;
-
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
- throws HttpException, JSONException {
- if (null != username && null != password) {
-
- WebTarget target = null;
- Response response = null;
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target = DmaapClientUtil.getTarget(clientConfig,path);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
- } else {
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
-
- response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
-
- }
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public String getResponse(final String path, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if (null != username && null != password) {
- WebTarget target = null;
- Response response = null;
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- target = DmaapClientUtil.getTarget(clientConfig,path);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
- } else {
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- String encoding = Base64.encodeAsString(username + ":" + password);
- response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
- }
- MRClientFactory.setHTTPHeadersMap(response.getHeaders());
-
- String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info(LOG_TRANSACTION_ID + transactionid);
- }
-
- responseData = (String) response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
- final String password, final String protocolFlag) throws HttpException, JSONException {
- if (null != username && null != password) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
-
- return getResponseDataInJson(response);
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public JSONObject getNoAuth(final String path) throws HttpException, JSONException {
-
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path);
- response = DmaapClientUtil.getResponsewtNoAuth(target);
-
- return getResponseDataInJson(response);
- }
-
- public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
- final String password, final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- if (null != username && null != password) {
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
-
- MRClientFactory.setHTTPHeadersMap(response.getHeaders());
-
- String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info(LOG_TRANSACTION_ID + transactionid);
- }
-
- responseData = (String) response.readEntity(String.class);
- return responseData;
- } else {
- throw new HttpException(AUTH_FAILED);
- }
- }
-
- public String getNoAuthResponse(String path, final String username, final String password,
- final String protocolFlag) throws HttpException, JSONException {
- String responseData = null;
- WebTarget target = null;
- Response response = null;
- target = DmaapClientUtil.getTarget(clientConfig,path, username, password);
- response = DmaapClientUtil.getResponsewtNoAuth(target);
-
- MRClientFactory.setHTTPHeadersMap(response.getHeaders());
-
- String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info(LOG_TRANSACTION_ID + transactionid);
- }
-
- responseData = (String) response.readEntity(String.class);
- return responseData;
-
- }
-
- private JSONObject getResponseDataInJson(Response response) throws JSONException {
- try {
- MRClientFactory.setHTTPHeadersMap(response.getHeaders());
-
- // MultivaluedMap<String, Object> headersMap =
- // for(String key : headersMap.keySet()) {
- String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
- if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
- fLog.info(LOG_TRANSACTION_ID + transactionid);
- }
-
- if (response.getStatus() == 403) {
- JSONObject jsonObject = null;
- jsonObject = new JSONObject();
- JSONArray jsonArray = new JSONArray();
- jsonArray.put(response.getEntity());
- jsonObject.put(JSON_RESULT, jsonArray);
- jsonObject.put(JSON_STATUS, response.getStatus());
- return jsonObject;
- }
- String responseData = (String) response.readEntity(String.class);
-
- JSONTokener jsonTokener = new JSONTokener(responseData);
- JSONObject jsonObject = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- JSONArray jsonArray = new JSONArray(jsonTokener);
- jsonObject = new JSONObject();
- jsonObject.put(JSON_RESULT, jsonArray);
- jsonObject.put(JSON_STATUS, response.getStatus());
- } else {
- jsonObject = new JSONObject(jsonTokener);
- jsonObject.put(JSON_STATUS, response.getStatus());
- }
-
- return jsonObject;
- } catch (JSONException excp) {
- fLog.error("DMAAP - Error reading response data.", excp);
- return null;
- }
-
- }
-
- public String getHTTPErrorResponseMessage(String responseString) {
-
- String response = null;
- int beginIndex = 0;
- int endIndex = 0;
- if (responseString.contains("<body>")) {
-
- beginIndex = responseString.indexOf("body>") + 5;
- endIndex = responseString.indexOf("</body");
- response = responseString.substring(beginIndex, endIndex);
- }
-
- return response;
-
- }
-
- public String getHTTPErrorResponseCode(String responseString) {
-
- String response = null;
- int beginIndex = 0;
- int endIndex = 0;
- if (responseString.contains("<title>")) {
- beginIndex = responseString.indexOf("title>") + 6;
- endIndex = responseString.indexOf("</title");
- response = responseString.substring(beginIndex, endIndex);
- }
-
- return response;
- }
+ private static final String HEADER_TRANSACTION_ID = "transactionid";
+
+ private static final String JSON_RESULT = "result";
+ private static final String JSON_STATUS = "status";
+
+ private static final String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.";
+ private static final String LOG_TRANSACTION_ID = "TransactionId : {}";
+
+ private ClientConfig clientConfig = null;
+
+ protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT);
+
+ logger = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, stdSvcPort);
+
+ logger = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
+ super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L,
+ TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+ logger = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ public ClientConfig getClientConfig1() {
+ return clientConfig;
+ }
+
+ public void setClientConfig(ClientConfig config) {
+ this.clientConfig = config;
+ }
+
+ @Override
+ public void close() {
+ // nothing to close
+ }
+
+ protected Set<String> jsonArrayToSet(JSONArray array) {
+ if (array == null) {
+ return null;
+ }
+ final TreeSet<String> set = new TreeSet<>();
+ for (int i = 0; i < array.length(); i++) {
+ set.add(array.getString(i));
+ }
+ return set;
+ }
+
+ public void logTo(Logger log) {
+ logger = log;
+ replaceLogger(log);
+ }
+
+ protected Logger getLog() {
+ return logger;
+ }
+
+ private Logger logger;
+
+ public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
+ final String password, final String protocalFlag) throws HttpException, JSONException {
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType);
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public JSONObject postNoAuth(final String path, final byte[] data, String contentType)
+ throws HttpException, JSONException {
+ WebTarget target = null;
+ Response response = null;
+ if (contentType == null) {
+ contentType = "text/pain";
+ }
+ target = DmaapClientUtil.getTarget(clientConfig, path);
+
+ response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
+
+ return getResponseDataInJson(response);
+ }
+
+ public String postWithResponse(final String path, final byte[] data, final String contentType,
+ final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.postResponsewtBasicAuth(target, encoding, data, contentType);
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public String postNoAuthWithResponse(final String path, final byte[] data, String contentType)
+ throws HttpException, JSONException {
+
+ String responseData = null;
+ WebTarget target = null;
+ Response response = null;
+ if (contentType == null) {
+ contentType = "text/pain";
+ }
+ target = DmaapClientUtil.getTarget(clientConfig, path);
+
+ response = DmaapClientUtil.postResponsewtNoAuth(target, data, contentType);
+ responseData = response.readEntity(String.class);
+ return responseData;
+ }
+
+ public JSONObject postAuth(PostAuthDataObject postAuthDO) throws HttpException, JSONException {
+ if ((null != postAuthDO.getUsername() && null != postAuthDO.getPassword())) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, postAuthDO.getPath(), postAuthDO.getUsername(),
+ postAuthDO.getPassword());
+ response = DmaapClientUtil.postResponsewtCambriaAuth(target, postAuthDO.getAuthKey(),
+ postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType());
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
+ final String authKey, final String authDate, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ response = DmaapClientUtil.postResponsewtCambriaAuth(target, authKey, authDate, data, contentType);
+ responseData = response.readEntity(String.class);
+ return responseData;
+
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
+ throws HttpException, JSONException {
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+ Response response = null;
+
+ if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target = DmaapClientUtil.getTarget(clientConfig, path);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
+ } else {
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+
+ response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
+
+ }
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public String getResponse(final String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+ WebTarget target = null;
+ Response response = null;
+ if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target = DmaapClientUtil.getTarget(clientConfig, path);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, username, password);
+ } else {
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ String encoding = Base64.encodeAsString(username + ":" + password);
+ response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding);
+ }
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
+
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ logger.info(LOG_TRANSACTION_ID, transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
+ if (null != username && null != password) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public JSONObject getNoAuth(final String path) throws HttpException, JSONException {
+
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path);
+ response = DmaapClientUtil.getResponsewtNoAuth(target);
+
+ return getResponseDataInJson(response);
+ }
+
+ public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
+ final String password, final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate);
+
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
+
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ logger.info(LOG_TRANSACTION_ID, transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException(AUTH_FAILED);
+ }
+ }
+
+ public String getNoAuthResponse(String path, final String username, final String password,
+ final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ WebTarget target = null;
+ Response response = null;
+ target = DmaapClientUtil.getTarget(clientConfig, path, username, password);
+ response = DmaapClientUtil.getResponsewtNoAuth(target);
+
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
+
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ logger.info(LOG_TRANSACTION_ID, transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+
+ }
+
+ private JSONObject getResponseDataInJson(Response response) throws JSONException {
+ try {
+ MRClientFactory.setHTTPHeadersMap(response.getHeaders());
+
+ // MultivaluedMap<String, Object> headersMap =
+ // for(String key : headersMap.keySet()) {
+ String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID);
+ if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+ logger.info(LOG_TRANSACTION_ID, transactionid);
+ }
+
+ if (response.getStatus() == HttpStatus.SC_FORBIDDEN) {
+ JSONObject jsonObject = null;
+ jsonObject = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(response.getEntity());
+ jsonObject.put(JSON_RESULT, jsonArray);
+ jsonObject.put(JSON_STATUS, response.getStatus());
+ return jsonObject;
+ }
+ String responseData = response.readEntity(String.class);
+
+ JSONTokener jsonTokener = new JSONTokener(responseData);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put(JSON_RESULT, jsonArray);
+ jsonObject.put(JSON_STATUS, response.getStatus());
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonObject.put(JSON_STATUS, response.getStatus());
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ logger.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+
+ }
+
+ public String getHTTPErrorResponseMessage(String responseString) {
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if (responseString.contains("<body>")) {
+
+ beginIndex = responseString.indexOf("body>") + 5;
+ endIndex = responseString.indexOf("</body");
+ response = responseString.substring(beginIndex, endIndex);
+ }
+
+ return response;
+
+ }
+
+ public String getHTTPErrorResponseCode(String responseString) {
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if (responseString.contains("<title>")) {
+ beginIndex = responseString.indexOf("title>") + 6;
+ endIndex = responseString.indexOf("</title");
+ response = responseString.substring(beginIndex, endIndex);
+ }
+
+ return response;
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
index 5c7259c..19f5b2c 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +19,13 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
+import com.att.nsa.apiClient.http.HttpClient;
+import com.att.nsa.apiClient.http.HttpException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -34,454 +39,374 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.zip.GZIPOutputStream;
-
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.apiClient.http.HttpClient;
-import com.att.nsa.apiClient.http.HttpException;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
/**
* This is a batching publisher class that allows the client to publish messages
* in batches that are limited in terms of size and/or hold time.
- *
+ *
* @author author
* @deprecated This class's tricky locking doesn't quite work
- *
*/
@Deprecated
-public class MRBatchPublisher implements MRBatchingPublisher
-{
- public static final long MIN_MAX_AGE_MS = 1;
-
- /**
- * Create a batch publisher.
- *
- * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
- * @param topic the topic to publish to
- * @param maxBatchSize the maximum size of a batch
- * @param maxAgeMs the maximum age of a batch
- */
- public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
- {
- if ( maxAgeMs < MIN_MAX_AGE_MS)
- {
- fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS);
- maxAgeMs = MIN_MAX_AGE_MS;
- }
-
- try {
- fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException (e);
- }
-
- // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
- // the oldest msg to hit max age? (locking is complicated, but should be do-able)
- fExec = new ScheduledThreadPoolExecutor ( 1 );
- fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- fSender.setApiCredentials ( apiKey, apiSecret );
- }
-
- @Override
- public void clearApiCredentials ()
- {
- fSender.clearApiCredentials ();
- }
-
- /**
- * Send the given message with the given partition
- * @param partition
- * @param msg
- * @throws IOException
- */
- @Override
- public int send ( String partition, String msg ) throws IOException
- {
- return send ( new message ( partition, msg ) );
- }
- @Override
- public int send ( String msg ) throws IOException
- {
- return send ( new message ( "",msg ) );
- }
- /**
- * Send the given message
- * @param userMsg a message
- * @throws IOException
- */
- @Override
- public int send ( message userMsg ) throws IOException
- {
- final LinkedList<message> list = new LinkedList<message> ();
- list.add ( userMsg );
- return send ( list );
- }
-
- /**
- * Send the given set of messages
- * @param msgs the set of messages, sent in order of iteration
- * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
- * @throws IOException
- */
- @Override
- public int send ( Collection<message> msgs ) throws IOException
- {
- if ( msgs.isEmpty() )
- {
- fSender.queue ( msgs );
- }
- return fSender.size ();
- }
-
- @Override
- public int getPendingMessageCount ()
- {
- return fSender.size ();
- }
-
- /**
- * Send any pending messages and close this publisher.
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- public void close ()
- {
- try
- {
- final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
- if ( remains.isEmpty() )
- {
- fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
- + "(Consider using the alternate close method to capture unsent messages in this case.)" );
- }
- }
- catch ( InterruptedException e )
- {
- fLog.warn ( "Possible message loss. " + e.getMessage(), e );
- Thread.currentThread().interrupt();
- }
- catch ( IOException e )
- {
- fLog.warn ( "Possible message loss. " + e.getMessage(), e );
- }
- }
-
- public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
- {
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
- fExec.shutdown ();
-
- final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
- final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
- while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
- {
- fSender.checkSend ( true );
- Thread.sleep ( 250 );
- }
-
- final LinkedList<message> result = new LinkedList<message> ();
- fSender.drainTo ( result );
- return result;
- }
-
- private final ScheduledThreadPoolExecutor fExec;
- private final Sender fSender;
-
- private static class TimestampedMessage extends message
- {
- public TimestampedMessage ( message m )
- {
- super ( m );
- timestamp = System.currentTimeMillis ();
- }
- public final long timestamp;
- }
-
- private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
-
- private class Sender extends MRBaseClient implements Runnable
- {
- public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
- {
- super ( baseUrls );
-
- fNextBatch = new LinkedList<TimestampedMessage> ();
- fSendingBatch = null;
- fTopic = topic;
- fMaxBatchSize = maxBatch;
- fMaxAgeMs = maxAgeMs;
- fCompress = compress;
- fLock = new ReentrantReadWriteLock ();
- fWriteLock = fLock.writeLock ();
- fReadLock = fLock.readLock ();
- fDontSendUntilMs = 0;
- }
-
- public void drainTo ( LinkedList<message> list )
- {
- fWriteLock.lock ();
- try
- {
- if ( fSendingBatch != null )
- {
- list.addAll ( fSendingBatch );
- }
- list.addAll ( fNextBatch );
-
- fSendingBatch = null;
- fNextBatch.clear ();
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
-
- /**
- * Called periodically by the background executor.
- */
- @Override
- public void run ()
- {
- try
- {
- checkSend ( false );
- }
- catch ( IOException e )
- {
- fLog.warn ( "MR background send: " + e.getMessage () );
- fLog.error( "IOException " + e );
- }
- }
-
- public int size ()
- {
- fReadLock.lock ();
- try
- {
- return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
- }
- finally
- {
- fReadLock.unlock ();
- }
- }
-
- /**
- * Called to queue a message.
- * @param msgs
- * @throws IOException
- */
- public void queue ( Collection<message> msgs ) throws IOException
- {
- fWriteLock.lock ();
- try
- {
- for ( message userMsg : msgs )
- {
- if ( userMsg != null )
- {
- fNextBatch.add ( new TimestampedMessage ( userMsg ) );
- }
- else
- {
- fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
- }
- }
- }
- finally
- {
- fWriteLock.unlock();
- }
- checkSend ( false );
- }
-
- /**
- * Send a batch if the queue is long enough, or the first pending message is old enough.
- * @param force
- * @throws IOException
- */
- public void checkSend ( boolean force ) throws IOException
- {
- // hold a read lock just long enough to evaluate whether a batch
- // should be sent
- boolean shouldSend = false;
- fReadLock.lock ();
- try
- {
- if ( fNextBatch.isEmpty() )
- {
- final long nowMs = System.currentTimeMillis ();
- shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
- if ( !shouldSend )
- {
- final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, unless forced, wait after an error
- shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
- }
- // else: even in 'force', there's nothing to send, so shouldSend=false is fine
- }
- finally
- {
- fReadLock.unlock ();
- }
-
- // if a send is required, acquire a write lock, swap out the next batch,
- // swap in a fresh batch, and release the lock for the caller to start
- // filling a batch again. After releasing the lock, send the current
- // batch. (There could be more messages added between read unlock and
- // write lock, but that's fine.)
- if ( shouldSend )
- {
- fSendingBatch = null;
-
- fWriteLock.lock ();
- try
- {
- fSendingBatch = fNextBatch;
- fNextBatch = new LinkedList<TimestampedMessage> ();
- }
- finally
- {
- fWriteLock.unlock ();
- }
-
- if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
- {
- fLog.warn ( "Send failed, rebuilding send queue." );
-
- // note the time for back-off
- fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis ();
-
- // the send failed. reconstruct the pending queue
- fWriteLock.lock ();
- try
- {
- final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
- fNextBatch = fSendingBatch;
- fNextBatch.addAll ( nextGroup );
- fSendingBatch = null;
- fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
- else
- {
- fWriteLock.lock ();
- try
- {
- fSendingBatch = null;
- }
- finally
- {
- fWriteLock.unlock ();
- }
- }
- }
- }
-
- private LinkedList<TimestampedMessage> fNextBatch;
- private LinkedList<TimestampedMessage> fSendingBatch;
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxAgeMs;
- private final boolean fCompress;
- private final ReentrantReadWriteLock fLock;
- private final WriteLock fWriteLock;
- private final ReadLock fReadLock;
- private long fDontSendUntilMs;
- private static final long SF_WAIT_AFTER_ERROR = 1000;
- }
-
- // this is static so that it's clearly not using any mutable member data outside of a lock
- private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
- {
- // it's possible for this call to be made with an empty list. in this case, just return.
- if ( toSend.isEmpty() )
- {
- return true;
- }
-
- final long nowMs = System.currentTimeMillis ();
- final String url = MRConstants.makeUrl ( topic );
-
- log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
- try
- {
- OutputStream os = baseStream;
- if ( compress )
- {
- os = new GZIPOutputStream ( baseStream );
- }
- for ( TimestampedMessage m : toSend )
- {
- os.write ( ( "" + m.fPartition.length () ).getBytes() );
- os.write ( '.' );
- os.write ( ( "" + m.fMsg.length () ).getBytes() );
- os.write ( '.' );
- os.write ( m.fPartition.getBytes() );
- os.write ( m.fMsg.getBytes() );
- os.write ( '\n' );
- }
- os.close ();
- }
- catch ( IOException e )
- {
- log.warn ( "Problem writing stream to post: " + e.getMessage (),e );
- return false;
- }
-
- boolean result = false;
- final long startMs = System.currentTimeMillis ();
- try
- {
- client.post ( url, compress ?
- MRFormat.CAMBRIA_ZIP.toString () :
- MRFormat.CAMBRIA.toString (),
- baseStream.toByteArray(), false );
- result = true;
- }
- catch ( HttpException e )
- {
- log.warn ( "Problem posting to MR: " + e.getMessage(),e );
- }
- catch ( IOException e )
- {
- log.warn ( "Problem posting to MR: " + e.getMessage(),e );
- }
-
- log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
- return result;
- }
-
- @Override
- public void logTo ( Logger log )
- {
- fLog = log;
- }
-
- @Override
- public MRPublisherResponse sendBatchWithResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-
+public class MRBatchPublisher implements MRBatchingPublisher {
+ public static final long MIN_MAX_AGE_MS = 1;
+
+ /**
+ * Create a batch publisher.
+ *
+ * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
+ * @param topic the topic to publish to
+ * @param maxBatchSize the maximum size of a batch
+ * @param maxAgeMs the maximum age of a batch
+ */
+ public MRBatchPublisher(Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress) {
+ if (maxAgeMs < MIN_MAX_AGE_MS) {
+ logger.warn("Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS);
+ maxAgeMs = MIN_MAX_AGE_MS;
+ }
+
+ try {
+ fSender = new Sender(baseUrls, topic, maxBatchSize, maxAgeMs, compress);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
+ // the oldest msg to hit max age? (locking is complicated, but should be do-able)
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(fSender, 100, 50, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setApiCredentials(String apiKey, String apiSecret) {
+ fSender.setApiCredentials(apiKey, apiSecret);
+ }
+
+ @Override
+ public void clearApiCredentials() {
+ fSender.clearApiCredentials();
+ }
+
+ /**
+ * Send the given message with the given partition.
+ *
+ * @param partition
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public int send(String partition, String msg) throws IOException {
+ return send(new Message(partition, msg));
+ }
+
+ @Override
+ public int send(String msg) throws IOException {
+ return send(new Message("", msg));
+ }
+
+ /**
+ * Send the given message.
+ *
+ * @param userMsg a message
+ * @throws IOException
+ */
+ @Override
+ public int send(Message userMsg) throws IOException {
+ final LinkedList<Message> list = new LinkedList<>();
+ list.add(userMsg);
+ return send(list);
+ }
+
+ /**
+ * Send the given set of messages.
+ *
+ * @param msgs the set of messages, sent in order of iteration
+ * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
+ * @throws IOException
+ */
+ @Override
+ public int send(Collection<Message> msgs) throws IOException {
+ if (msgs.isEmpty()) {
+ fSender.queue(msgs);
+ }
+ return fSender.size();
+ }
+
+ @Override
+ public int getPendingMessageCount() {
+ return fSender.size();
+ }
+
+ /**
+ * Send any pending messages and close this publisher.
+ */
+ @Override
+ public void close() {
+ try {
+ final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.isEmpty()) {
+ logger.warn("Closing publisher with {} messages unsent. (Consider using the alternate close method to capture unsent messages in this case.)", remains.size());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Possible message loss. " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ logger.warn("Possible message loss. " + e.getMessage(), e);
+ }
+ }
+
+ public List<Message> close(long time, TimeUnit unit) throws InterruptedException, IOException {
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
+
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ final long timeoutAtMs = System.currentTimeMillis() + waitInMs;
+ while (System.currentTimeMillis() < timeoutAtMs && getPendingMessageCount() > 0) {
+ fSender.checkSend(true);
+ Thread.sleep(250);
+ }
+
+ final LinkedList<Message> result = new LinkedList<>();
+ fSender.drainTo(result);
+ return result;
+ }
+
+ private final ScheduledThreadPoolExecutor fExec;
+ private final Sender fSender;
+
+ private static class TimestampedMessage extends Message {
+ public TimestampedMessage(Message m) {
+ super(m);
+ timestamp = System.currentTimeMillis();
+ }
+
+ public final long timestamp;
+ }
+
+ private Logger logger = LoggerFactory.getLogger(MRBatchPublisher.class);
+
+ private class Sender extends MRBaseClient implements Runnable {
+ public Sender(Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress) throws MalformedURLException {
+ super(baseUrls);
+
+ fNextBatch = new LinkedList<>();
+ fSendingBatch = null;
+ fTopic = topic;
+ fMaxBatchSize = maxBatch;
+ fMaxAgeMs = maxAgeMs;
+ fCompress = compress;
+ fLock = new ReentrantReadWriteLock();
+ fWriteLock = fLock.writeLock();
+ fReadLock = fLock.readLock();
+ fDontSendUntilMs = 0;
+ }
+
+ public void drainTo(List<Message> list) {
+ fWriteLock.lock();
+ try {
+ if (fSendingBatch != null) {
+ list.addAll(fSendingBatch);
+ }
+ list.addAll(fNextBatch);
+
+ fSendingBatch = null;
+ fNextBatch.clear();
+ } finally {
+ fWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Called periodically by the background executor.
+ */
+ @Override
+ public void run() {
+ try {
+ checkSend(false);
+ } catch (Exception e) {
+ logger.warn("MR background send: {}", e.getMessage());
+ logger.error("IOException {}", e.getMessage());
+ }
+ }
+
+ public int size() {
+ fReadLock.lock();
+ try {
+ return fNextBatch.size() + (fSendingBatch == null ? 0 : fSendingBatch.size());
+ } finally {
+ fReadLock.unlock();
+ }
+ }
+
+ /**
+ * Called to queue a message.
+ *
+ * @param msgs
+ * @throws IOException
+ */
+ public void queue(Collection<Message> msgs) {
+ fWriteLock.lock();
+ try {
+ for (Message userMsg : msgs) {
+ if (userMsg != null) {
+ fNextBatch.add(new TimestampedMessage(userMsg));
+ } else {
+ logger.warn("MRBatchPublisher::Sender::queue received a null message.");
+ }
+ }
+ } finally {
+ fWriteLock.unlock();
+ }
+ checkSend(false);
+ }
+
+ /**
+ * Send a batch if the queue is long enough, or the first pending message is old enough.
+ *
+ * @param force
+ */
+ public void checkSend(boolean force) {
+ // hold a read lock just long enough to evaluate whether a batch
+ // should be sent
+ boolean shouldSend = false;
+ fReadLock.lock();
+ try {
+ if (fNextBatch.isEmpty()) {
+ final long nowMs = System.currentTimeMillis();
+ shouldSend = (force || fNextBatch.size() >= fMaxBatchSize);
+ if (!shouldSend) {
+ final long sendAtMs = fNextBatch.getFirst().timestamp + fMaxAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, unless forced, wait after an error
+ shouldSend = force || (shouldSend && nowMs >= fDontSendUntilMs);
+ }
+ // else: even in 'force', there's nothing to send, so shouldSend=false is fine
+ } finally {
+ fReadLock.unlock();
+ }
+
+ // if a send is required, acquire a write lock, swap out the next batch,
+ // swap in a fresh batch, and release the lock for the caller to start
+ // filling a batch again. After releasing the lock, send the current
+ // batch. (There could be more messages added between read unlock and
+ // write lock, but that's fine.)
+ if (shouldSend) {
+ fSendingBatch = null;
+
+ fWriteLock.lock();
+ try {
+ fSendingBatch = fNextBatch;
+ fNextBatch = new LinkedList<>();
+ } finally {
+ fWriteLock.unlock();
+ }
+
+ if (!doSend(fSendingBatch, this, fTopic, fCompress, logger)) {
+ logger.warn("Send failed, rebuilding send queue.");
+
+ // note the time for back-off
+ fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis();
+
+ // the send failed. reconstruct the pending queue
+ fWriteLock.lock();
+ try {
+ final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
+ fNextBatch = fSendingBatch;
+ fNextBatch.addAll(nextGroup);
+ fSendingBatch = null;
+ logger.info("Send queue rebuilt; {} messages to send.", fNextBatch.size());
+ } finally {
+ fWriteLock.unlock();
+ }
+ } else {
+ fWriteLock.lock();
+ try {
+ fSendingBatch = null;
+ } finally {
+ fWriteLock.unlock();
+ }
+ }
+ }
+ }
+
+ private LinkedList<TimestampedMessage> fNextBatch;
+ private LinkedList<TimestampedMessage> fSendingBatch;
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxAgeMs;
+ private final boolean fCompress;
+ private final ReentrantReadWriteLock fLock;
+ private final WriteLock fWriteLock;
+ private final ReadLock fReadLock;
+ private long fDontSendUntilMs;
+ private static final long SF_WAIT_AFTER_ERROR = 1000;
+ }
+
+ // this is static so that it's clearly not using any mutable member data outside of a lock
+ private static boolean doSend(LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log) {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if (toSend.isEmpty()) {
+ return true;
+ }
+
+ final long nowMs = System.currentTimeMillis();
+ final String url = MRConstants.makeUrl(topic);
+
+ log.info("sending {} msgs to {}. Oldest: {} ms", toSend.size(), url, (nowMs - toSend.getFirst().timestamp));
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ try {
+ OutputStream os = baseStream;
+ if (compress) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : toSend) {
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } catch (IOException e) {
+ log.warn("Problem writing stream to post: " + e.getMessage(), e);
+ return false;
+ }
+
+ boolean result = false;
+ final long startMs = System.currentTimeMillis();
+ try {
+ client.post(url,
+ compress ? MRFormat.CAMBRIA_ZIP.toString() : MRFormat.CAMBRIA.toString(),
+ baseStream.toByteArray(), false);
+ result = true;
+ } catch (HttpException | IOException e) {
+ log.warn("Problem posting to MR: " + e.getMessage(), e);
+ }
+
+ log.info("MR response ({} ms): OK", (System.currentTimeMillis() - startMs));
+ return result;
+ }
+
+ @Override
+ public void logTo(Logger log) {
+ logger = log;
+ }
+
+ @Override
+ public MRPublisherResponse sendBatchWithResponse() {
+ // Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
index 6a13910..0507eee 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,37 +19,37 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MRClientVersionInfo
-{
- private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class);
- public static String getVersion ()
- {
- return version;
- }
+public class MRClientVersionInfo {
+ private static final Logger logger = LoggerFactory.getLogger(MRClientVersionInfo.class);
+
+ public static String getVersion() {
+ return VERSION;
+ }
+
+ private static final Properties PROPS = new Properties();
+ private static final String VERSION;
- private static final Properties props = new Properties();
- private static final String version;
- static {
- String use = null;
- try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties" )) {
- if (is != null) {
- props.load(is);
- use = props.getProperty ( "MRClientVersion", null );
- }
- } catch ( IOException e ) {
- logger.error("exception: ", e);
- }
- version = use;
- }
+ static {
+ String use = null;
+ try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties")) {
+ if (is != null) {
+ PROPS.load(is);
+ use = PROPS.getProperty("MRClientVersion", null);
+ }
+ } catch (IOException e) {
+ logger.error("exception: ", e);
+ }
+ VERSION = use;
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
index dbf6b4d..b05d839 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,162 +19,155 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
-import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
import org.apache.http.HttpHost;
-class MRConstants
-{
- private static final String PROTOCOL = "http";
- public static final String CONTEXT = "/";
- public static final String BASE_PATH = "events/";
- public static final int STD_MR_SERVICE_PORT = 8080;
-
- public static String escape ( String s )
- {
- try
- {
- return URLEncoder.encode ( s, "UTF-8");
- }
- catch ( UnsupportedEncodingException e )
- {
- throw new IllegalArgumentException(e);
- }
- }
-
- public static String makeUrl ( String rawTopic )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer().
- append ( MRConstants.CONTEXT).
- append ( MRConstants.BASE_PATH).
- append ( cleanTopic );
- return url.toString ();
- }
-
- public static String makeUrl ( final String host, final String rawTopic )
- {
- final String cleanTopic = escape ( rawTopic );
- final StringBuffer url = new StringBuffer();
-
- if (!host.startsWith("http") && !host.startsWith("https") ) {
- url.append( PROTOCOL + "://" );
- }
- url.append(host);
- url.append ( MRConstants.CONTEXT);
- url.append ( MRConstants.BASE_PATH);
- url.append ( cleanTopic );
- return url.toString ();
- }
-
- public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer();
-
- if (transferprotocol !=null && !transferprotocol.equals("")) {
- url.append( transferprotocol + "://" );
- }else{
- url.append( PROTOCOL + "://" );
- }
- url.append(host);
- url.append ( MRConstants.CONTEXT);
- url.append ( MRConstants.BASE_PATH);
- url.append ( cleanTopic );
- if(parttion!=null && !parttion.equalsIgnoreCase(""))
- url.append("?partitionKey=").append(parttion);
- return url.toString ();
- }
- public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
- {
- final String cleanConsumerGroup = escape ( rawConsumerGroup );
- final String cleanConsumerId = escape ( rawConsumerId );
- return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
- }
-
- /**
- * Create a list of HttpHosts from an input list of strings. Input strings have
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param hosts
- * @return a list of hosts
- */
- public static List<HttpHost> createHostsList(Collection<String> hosts)
- {
- final ArrayList<HttpHost> convertedHosts = new ArrayList<> ();
- for ( String host : hosts )
- {
- if ( host.length () == 0 ) continue;
- convertedHosts.add ( hostForString ( host ) );
- }
- return convertedHosts;
- }
-
- /**
- * Return an HttpHost from an input string. Input string has
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param host
- * @return a list of hosts
- */
- public static HttpHost hostForString ( String host )
- {
- if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
-
- String hostPart = host;
- int port = STD_MR_SERVICE_PORT;
-
- final int colon = host.indexOf ( ':' );
- if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
- if ( colon > 0 )
- {
- hostPart = host.substring ( 0, colon ).trim();
-
- final String portPart = host.substring ( colon + 1 ).trim();
- if ( portPart.length () > 0 )
- {
- try
- {
- port = Integer.parseInt ( portPart );
- }
- catch ( NumberFormatException x )
- {
- throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
- }
- }
- // else: use default port on "foo:"
- }
-
- return new HttpHost ( hostPart, port );
- }
-
- public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) {
- final String cleanConsumerGroup = escape ( fGroup );
- final String cleanConsumerId = escape ( fId );
-
- StringBuffer url = new StringBuffer();
-
- if (transferprotocol !=null && !transferprotocol.equals("")) {
- url.append( transferprotocol + "://" );
- }else{
- url.append( PROTOCOL + "://" );
- }
-
- url.append(host);
- url.append(CONTEXT);
- url.append(BASE_PATH);
- url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId);
-
- return url.toString();
- }
+class MRConstants {
+
+ private MRConstants() {
+
+ }
+
+ private static final String PROTOCOL = "http";
+ public static final String CONTEXT = "/";
+ public static final String BASE_PATH = "events/";
+ public static final int STD_MR_SERVICE_PORT = 8080;
+
+ public static String escape(String url) {
+ return URLEncoder.encode(url, StandardCharsets.UTF_8);
+ }
+
+ public static String makeUrl(String rawTopic) {
+ final String cleanTopic = escape(rawTopic);
+
+ return new StringBuilder()
+ .append(MRConstants.CONTEXT)
+ .append(MRConstants.BASE_PATH)
+ .append(cleanTopic).toString();
+ }
+
+ public static String makeUrl(final String host, final String rawTopic) {
+ final String cleanTopic = escape(rawTopic);
+ final StringBuilder url = new StringBuilder();
+
+ if (!host.startsWith("http") && !host.startsWith("https")) {
+ url.append(PROTOCOL).append("://");
+ }
+ url.append(host);
+ url.append(MRConstants.CONTEXT);
+ url.append(MRConstants.BASE_PATH);
+ url.append(cleanTopic);
+ return url.toString();
+ }
+
+ public static String makeUrl(final String host, final String rawTopic, final String transferProtocol, final String partition) {
+ final String cleanTopic = escape(rawTopic);
+
+ final StringBuilder url = new StringBuilder();
+
+ if (transferProtocol != null && !transferProtocol.isEmpty()) {
+ url.append(transferProtocol).append("://");
+ } else {
+ url.append(PROTOCOL).append("://");
+ }
+ url.append(host);
+ url.append(MRConstants.CONTEXT);
+ url.append(MRConstants.BASE_PATH);
+ url.append(cleanTopic);
+ if (partition != null && !partition.isEmpty()) {
+ url.append("?partitionKey=").append(partition);
+ }
+ return url.toString();
+ }
+
+ public static String makeConsumerUrl(String topic, String rawConsumerGroup, String rawConsumerId) {
+ final String cleanConsumerGroup = escape(rawConsumerGroup);
+ final String cleanConsumerId = escape(rawConsumerId);
+ return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
+ }
+
+ /**
+ * Create a list of HttpHosts from an input list of strings. Input strings have
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static List<HttpHost> createHostsList(Collection<String> hosts) {
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
+ for (String host : hosts) {
+ if (host.length() == 0) {
+ continue;
+ }
+ convertedHosts.add(hostForString(host));
+ }
+ return convertedHosts;
+ }
+
+ /**
+ * Return an HttpHost from an input string. Input string has
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param host
+ * @return a list of hosts
+ */
+ public static HttpHost hostForString(String host) {
+ if (host.length() < 1) {
+ throw new IllegalArgumentException("An empty host entry is invalid.");
+ }
+
+ String hostPart = host;
+ int port = STD_MR_SERVICE_PORT;
+
+ final int colon = host.indexOf(':');
+ if (colon == 0) {
+ throw new IllegalArgumentException("Host entry '" + host + "' is invalid.");
+ }
+ if (colon > 0) {
+ hostPart = host.substring(0, colon).trim();
+
+ final String portPart = host.substring(colon + 1).trim();
+ if (portPart.length() > 0) {
+ try {
+ port = Integer.parseInt(portPart);
+ } catch (NumberFormatException x) {
+ throw new IllegalArgumentException("Host entry '" + host + "' is invalid.", x);
+ }
+ }
+ // else: use default port on "foo:"
+ }
+
+ return new HttpHost(hostPart, port);
+ }
+
+ public static String makeConsumerUrl(String host, String topic, String group, String id, final String transferprotocol) {
+ final String cleanConsumerGroup = escape(group);
+ final String cleanConsumerId = escape(id);
+
+ StringBuilder url = new StringBuilder();
+
+ if (transferprotocol != null && !transferprotocol.equals("")) {
+ url.append(transferprotocol).append("://");
+ } else {
+ url.append(PROTOCOL).append("://");
+ }
+
+ url.append(host)
+ .append(CONTEXT)
+ .append(BASE_PATH)
+ .append(topic)
+ .append("/").append(cleanConsumerGroup)
+ .append("/").append(cleanConsumerId);
+
+ return url.toString();
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
index 6c67313..57ae3ee 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,24 +19,26 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
import com.att.aft.dme2.api.DME2Client;
import com.att.aft.dme2.api.DME2Exception;
-import org.onap.dmaap.mr.client.HostSelector;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-
-import java.io.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
@@ -42,20 +46,24 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
+import org.onap.dmaap.mr.client.HostSelector;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.ProtocolType;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
- private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger logger = LoggerFactory.getLogger(MRConsumerImpl.class);
public static final String ROUTER_FILE_PATH = null;
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String protocolFlag = ProtocolType.DME2.getValue();
public String consumerFilePath;
private static final String JSON_RESULT = "result";
- private static final String PROPS_PROTOCOL = "Protocol";
private static final String EXECPTION_MESSAGE = "exception: ";
private static final String SUCCESS_MESSAGE = "Success";
@@ -70,6 +78,29 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
private static final String URL_PARAM_LIMIT = "limit";
private static final String URL_PARAM_TIMEOUT = "timeout";
+ private static final String USERNAME = "username";
+ private static final String SERVICE_NAME = "ServiceName";
+ private static final String PARTNER = "Partner";
+ private static final String ROUTE_OFFER = "routeOffer";
+ private static final String PROTOCOL = "Protocol";
+ private static final String METHOD_TYPE = "MethodType";
+ private static final String CONTENT_TYPE = "contenttype";
+ private static final String LATITUDE = "Latitude";
+ private static final String LONGITUDE = "Longitude";
+ private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
+ private static final String VERSION = "Version";
+ private static final String ENVIRONMENT = "Environment";
+ private static final String SUB_CONTEXT_PATH = "SubContextPath";
+ private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
+ private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
+ private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
+ private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
+ private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
+ private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
+ private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
+
private final String fTopic;
private final String fGroup;
private final String fId;
@@ -192,8 +223,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
final LinkedList<String> msgs = new LinkedList<>();
- ProtocolTypeConstants protocolFlagEnum = null;
- for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) {
+ ProtocolType protocolFlagEnum = null;
+ for (ProtocolType type : ProtocolType.values()) {
if (type.getValue().equalsIgnoreCase(protocolFlag)) {
protocolFlagEnum = type;
}
@@ -211,27 +242,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
break;
case AAF_AUTH:
String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
readJsonData(msgs, o);
break;
case AUTH_KEY:
final String urlKeyPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
timeoutMs, limit);
final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
readJsonData(msgs, authObject);
break;
case HTTPNOAUTH:
final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
readJsonData(msgs, getNoAuth(urlNoAuthPath));
break;
}
} catch (JSONException e) {
// unexpected response
reportProblemWithResponse();
- log.error(EXECPTION_MESSAGE, e);
+ logger.error(EXECPTION_MESSAGE, e);
} catch (HttpException e) {
throw new IOException(e);
}
@@ -244,10 +275,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
final JSONArray a = o.getJSONArray(JSON_RESULT);
if (a != null) {
for (int i = 0; i < a.length(); i++) {
- if (a.get(i) instanceof String)
+ if (a.get(i) instanceof String) {
msgs.add(a.getString(i));
- else
+ } else {
msgs.add(a.getJSONObject(i).toString());
+ }
}
}
}
@@ -264,7 +296,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
final LinkedList<String> msgs = new LinkedList<>();
MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
try {
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
dmeConfigure(timeoutMs, limit);
long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
@@ -277,9 +309,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
createMRConsumerResponse(reply, mrConsumerResponse);
}
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
String response = getResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
@@ -287,9 +319,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
createMRConsumerResponse(response, mrConsumerResponse);
}
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(
- MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
timeoutMs, limit);
String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
@@ -298,9 +330,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
createMRConsumerResponse(response, mrConsumerResponse);
}
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
- fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
+ fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
@@ -311,19 +343,19 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
} catch (JSONException e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("json exception: ", e);
+ logger.error("json exception: ", e);
} catch (HttpException e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error("http exception: ", e);
+ logger.error("http exception: ", e);
} catch (DME2Exception e) {
mrConsumerResponse.setResponseCode(e.getErrorCode());
mrConsumerResponse.setResponseMessage(e.getErrorMessage());
- log.error("DME2 exception: ", e);
+ logger.error("DME2 exception: ", e);
} catch (Exception e) {
mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
mrConsumerResponse.setResponseMessage(e.getMessage());
- log.error(EXECPTION_MESSAGE, e);
+ logger.error(EXECPTION_MESSAGE, e);
}
mrConsumerResponse.setActualMessages(msgs);
return mrConsumerResponse;
@@ -331,16 +363,16 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
@Override
protected void reportProblemWithResponse() {
- log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
+ logger.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
super.reportProblemWithResponse();
fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
}
private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
if (reply.startsWith("{")) {
- JSONObject jObject = new JSONObject(reply);
- String message = jObject.getString("message");
- int status = jObject.getInt("status");
+ JSONObject jsonObject = new JSONObject(reply);
+ String message = jsonObject.getString("message");
+ int status = jsonObject.getInt("status");
mrConsumerResponse.setResponseCode(Integer.toString(status));
@@ -372,7 +404,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
return jsonObject;
} catch (JSONException excp) {
- log.error("DMAAP - Error reading response data.", excp);
+ logger.error("DMAAP - Error reading response data.", excp);
return null;
}
}
@@ -403,22 +435,22 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
this.longPollingMs = timeoutMs;
- String latitude = props.getProperty("Latitude");
- String longitude = props.getProperty("Longitude");
- String version = props.getProperty("Version");
- String serviceName = props.getProperty("ServiceName");
- String env = props.getProperty("Environment");
- String partner = props.getProperty("Partner");
- String routeOffer = props.getProperty("routeOffer");
- String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
- String protocol = props.getProperty(PROPS_PROTOCOL);
- String methodType = props.getProperty("MethodType");
- String dmeuser = props.getProperty("username");
- String dmepassword = props.getProperty("password");
- String contenttype = props.getProperty("contenttype");
- String handlers = props.getProperty("sessionstickinessrequired");
-
- /**
+ String latitude = props.getProperty(LATITUDE);
+ String longitude = props.getProperty(LONGITUDE);
+ String version = props.getProperty(VERSION);
+ String serviceName = props.getProperty(SERVICE_NAME);
+ String env = props.getProperty(ENVIRONMENT);
+ String partner = props.getProperty(PARTNER);
+ String routeOffer = props.getProperty(ROUTE_OFFER);
+ String subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic + "/" + fGroup + "/" + fId;
+ String protocol = props.getProperty(PROTOCOL);
+ String methodType = props.getProperty(METHOD_TYPE);
+ String dmeuser = props.getProperty(USERNAME);
+ String dmepassword = props.getProperty(USERNAME);
+ String contenttype = props.getProperty(CONTENT_TYPE);
+ String handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
+
+ /*
* Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
* provided use the routeOffer value for auto failover within a cluster
*/
@@ -458,13 +490,13 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
url = contextUrl.toString();
DMETimeOuts = new HashMap<>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
DMETimeOuts.put("Content-Type", contenttype);
System.setProperty("AFT_LATITUDE", latitude);
System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
// SSL changes
System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
@@ -474,7 +506,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
long dme2PerEndPointTimeoutMs;
try {
- dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+ dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty(DME2_PER_HANDLER_TIMEOUT_MS));
// backward compatibility
if (dme2PerEndPointTimeoutMs <= 0) {
dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
@@ -483,15 +515,15 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
// backward compatibility
dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
getLog().debug(
- "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
+ DME2_PER_HANDLER_TIMEOUT_MS + " not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
}
try {
- dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+ dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty(DME2_REPLY_HANDLER_TIMEOUT_MS));
} catch (NumberFormatException nfe) {
try {
- long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
+ long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
getLog().debug(
"DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
@@ -518,9 +550,9 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
sender.setPayload("");
if (handlers != null && handlers.equalsIgnoreCase("yes")) {
sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
} else {
sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
}
@@ -548,7 +580,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
} catch (UnsupportedEncodingException e) {
- log.error("exception at createUrlPath () : ", e);
+ logger.error("exception at createUrlPath () : ", e);
}
}
@@ -560,10 +592,10 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
}
private String readRoute(String routeKey) {
- try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
+ try (InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
MRClientFactory.prop.load(input);
} catch (Exception ex) {
- log.error("Reply Router Error " + ex);
+ logger.error("Reply Router Error " + ex);
}
return MRClientFactory.prop.getProperty(routeKey);
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java
index 538f1e3..0ebebe5 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRFormat.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +19,39 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
-enum MRFormat
-{
- /**
- * Messages are sent using MR's message format.
- */
- CAMBRIA
- {
- @Override
- public String toString() { return "application/cambria"; }
- },
+enum MRFormat {
+ /**
+ * Messages are sent using MR's message format.
+ */
+ CAMBRIA {
+ @Override
+ public String toString() {
+ return "application/cambria";
+ }
+ },
- /**
- * Messages are sent using MR's message format with compression.
- */
- CAMBRIA_ZIP
- {
- @Override
- public String toString() { return "application/cambria-zip"; }
- },
+ /**
+ * Messages are sent using MR's message format with compression.
+ */
+ CAMBRIA_ZIP {
+ @Override
+ public String toString() {
+ return "application/cambria-zip";
+ }
+ },
- /**
- * messages are sent as simple JSON objects.
- */
- JSON
- {
- @Override
- public String toString() { return "application/json"; }
- }
+ /**
+ * messages are sent as simple JSON objects.
+ */
+ JSON {
+ @Override
+ public String toString() {
+ return "application/json";
+ }
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
index c1e2d12..414178d 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,256 +19,212 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Set;
import java.util.TreeSet;
-
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
import org.onap.dmaap.mr.client.MRIdentityManager;
import org.onap.dmaap.mr.client.MRTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager
-{
- private static final String BASE_URI_TOPIC = "/topics";
- private static final String BASE_URI_APIKEY = "/apiKeys";
-
- private static final String PARAM_DESCRIPTION = "description";
- private static final String PARAM_EMAIL = "email";
-
- private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class);
- public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException
- {
- super ( baseUrls );
- }
-
- @Override
- public Set<String> getTopics () throws IOException
- {
- final TreeSet<String> set = new TreeSet<> ();
- try
- {
- final JSONObject topicSet = get ( BASE_URI_TOPIC );
- final JSONArray a = topicSet.getJSONArray ( "topics" );
- for ( int i=0; i<a.length (); i++ )
- {
- set.add ( a.getString ( i ) );
- }
- }
- catch ( HttpObjectNotFoundException e )
- {
- getLog().warn ( "No /topics endpoint on service." );
- logger.error("HttpObjectNotFoundException: ", e);
- }
- catch ( JSONException e )
- {
- getLog().warn ( "Bad /topics result from service." );
- logger.error("JSONException: ", e);
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
- return set;
- }
-
- @Override
- public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- try
- {
- final JSONObject topicData = get ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) );
- return new TopicInfo ()
- {
- @Override
- public String getOwner ()
- {
- return topicData.optString ( "owner", null );
- }
-
- @Override
- public String getDescription ()
- {
- return topicData.optString ( PARAM_DESCRIPTION, null );
- }
-
- @Override
- public Set<String> getAllowedProducers ()
- {
- final JSONObject acl = topicData.optJSONObject ( "writerAcl" );
- if ( acl != null && acl.optBoolean ( "enabled", true ) )
- {
- return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
- }
- return null;
- }
-
- @Override
- public Set<String> getAllowedConsumers ()
- {
- final JSONObject acl = topicData.optJSONObject ( "readerAcl" );
- if ( acl != null && acl.optBoolean ( "enabled", true ) )
- {
- return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
- }
- return null;
- }
- };
- }
- catch ( JSONException e )
- {
- throw new IOException ( e );
- }
- catch ( HttpException e )
- {
- throw new IOException ( e );
- }
- }
-
- @Override
- public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException
- {
- final JSONObject o = new JSONObject ();
- o.put ( "topicName", topicName );
- o.put ( "topicDescription", topicDescription );
- o.put ( "partitionCount", partitionCount );
- o.put ( "replicationCount", replicationCount );
- post ( BASE_URI_TOPIC + "/create", o, false );
- }
-
- @Override
- public void deleteTopic ( String topic ) throws HttpException, IOException
- {
- delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) );
- }
-
- @Override
- public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return null == getAllowedProducers ( topic );
- }
-
- @Override
- public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return getTopicMetadata ( topic ).getAllowedProducers ();
- }
-
- @Override
- public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
- }
-
- @Override
- public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException
- {
- delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
- }
-
- @Override
- public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return null == getAllowedConsumers ( topic );
- }
-
- @Override
- public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException
- {
- return getTopicMetadata ( topic ).getAllowedConsumers ();
- }
-
- @Override
- public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
- }
-
- @Override
- public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException
- {
- delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
- }
-
- @Override
- public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException
- {
- try
- {
- final JSONObject o = new JSONObject ();
- o.put ( PARAM_EMAIL, email );
- o.put ( PARAM_DESCRIPTION, description );
- final JSONObject reply = post ( BASE_URI_APIKEY + "/create", o, true );
- return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) );
- }
- catch ( JSONException e )
- {
- // the response doesn't meet our expectation
- throw new MRApiException ( "The API key response is incomplete.", e );
- }
- }
-
- @Override
- public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- final JSONObject keyEntry = get ( BASE_URI_APIKEY + "/" + MRConstants.escape ( apiKey ) );
- if ( keyEntry == null )
- {
- return null;
- }
-
- return new ApiKey ()
- {
- @Override
- public String getEmail ()
- {
- final JSONObject aux = keyEntry.optJSONObject ( "aux" );
- if ( aux != null )
- {
- return aux.optString ( PARAM_EMAIL );
- }
- return null;
- }
-
- @Override
- public String getDescription ()
- {
- final JSONObject aux = keyEntry.optJSONObject ( "aux" );
- if ( aux != null )
- {
- return aux.optString ( PARAM_DESCRIPTION );
- }
- return null;
- }
- };
- }
-
- @Override
- public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException
- {
- final JSONObject o = new JSONObject ();
- if ( email != null ) o.put ( PARAM_EMAIL, email );
- if ( description != null ) o.put ( PARAM_DESCRIPTION, description );
- patch ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ), o );
- }
-
- @Override
- public void deleteCurrentApiKey () throws HttpException, IOException
- {
- delete ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ) );
- }
+public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager {
+ private static final String BASE_URI_TOPIC = "/topics";
+ private static final String BASE_URI_APIKEY = "/apiKeys";
+
+ private static final String PARAM_DESCRIPTION = "description";
+ private static final String PARAM_EMAIL = "email";
+
+ private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class);
+
+ public MRMetaClient(Collection<String> baseUrls) throws MalformedURLException {
+ super(baseUrls);
+ }
+
+ @Override
+ public Set<String> getTopics() throws IOException {
+ final TreeSet<String> set = new TreeSet<>();
+ try {
+ final JSONObject topicSet = get(BASE_URI_TOPIC);
+ final JSONArray a = topicSet.getJSONArray("topics");
+ for (int i = 0; i < a.length(); i++) {
+ set.add(a.getString(i));
+ }
+ } catch (HttpObjectNotFoundException e) {
+ getLog().warn("No /topics endpoint on service.");
+ logger.error("HttpObjectNotFoundException: ", e);
+ } catch (JSONException e) {
+ getLog().warn("Bad /topics result from service.");
+ logger.error("JSONException: ", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ return set;
+ }
+
+ @Override
+ public TopicInfo getTopicMetadata(String topic) throws HttpObjectNotFoundException, IOException {
+ try {
+ final JSONObject topicData = get(BASE_URI_TOPIC + "/" + MRConstants.escape(topic));
+ return new TopicInfo() {
+ @Override
+ public String getOwner() {
+ return topicData.optString("owner", null);
+ }
+
+ @Override
+ public String getDescription() {
+ return topicData.optString(PARAM_DESCRIPTION, null);
+ }
+
+ @Override
+ public Set<String> getAllowedProducers() {
+ final JSONObject acl = topicData.optJSONObject("writerAcl");
+ if (acl != null && acl.optBoolean("enabled", true)) {
+ return jsonArrayToSet(acl.optJSONArray("users"));
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers() {
+ final JSONObject acl = topicData.optJSONObject("readerAcl");
+ if (acl != null && acl.optBoolean("enabled", true)) {
+ return jsonArrayToSet(acl.optJSONArray("users"));
+ }
+ return null;
+ }
+ };
+ } catch (JSONException e) {
+ throw new IOException(e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void createTopic(String topicName, String topicDescription, int partitionCount, int replicationCount) throws HttpException, IOException {
+ final JSONObject o = new JSONObject();
+ o.put("topicName", topicName);
+ o.put("topicDescription", topicDescription);
+ o.put("partitionCount", partitionCount);
+ o.put("replicationCount", replicationCount);
+ post(BASE_URI_TOPIC + "/create", o, false);
+ }
+
+ @Override
+ public void deleteTopic(String topic) throws HttpException, IOException {
+ delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic));
+ }
+
+ @Override
+ public boolean isOpenForProducing(String topic) throws HttpObjectNotFoundException, IOException {
+ return null == getAllowedProducers(topic);
+ }
+
+ @Override
+ public Set<String> getAllowedProducers(String topic) throws HttpObjectNotFoundException, IOException {
+ return getTopicMetadata(topic).getAllowedProducers();
+ }
+
+ @Override
+ public void allowProducer(String topic, String apiKey) throws HttpObjectNotFoundException, HttpException, IOException {
+ put(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/producers/" + MRConstants.escape(apiKey), new JSONObject());
+ }
+
+ @Override
+ public void revokeProducer(String topic, String apiKey) throws HttpException, IOException {
+ delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/producers/" + MRConstants.escape(apiKey));
+ }
+
+ @Override
+ public boolean isOpenForConsuming(String topic) throws HttpObjectNotFoundException, IOException {
+ return null == getAllowedConsumers(topic);
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers(String topic) throws HttpObjectNotFoundException, IOException {
+ return getTopicMetadata(topic).getAllowedConsumers();
+ }
+
+ @Override
+ public void allowConsumer(String topic, String apiKey) throws HttpObjectNotFoundException, HttpException, IOException {
+ put(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/consumers/" + MRConstants.escape(apiKey), new JSONObject());
+ }
+
+ @Override
+ public void revokeConsumer(String topic, String apiKey) throws HttpException, IOException {
+ delete(BASE_URI_TOPIC + "/" + MRConstants.escape(topic) + "/consumers/" + MRConstants.escape(apiKey));
+ }
+
+ @Override
+ public ApiCredential createApiKey(String email, String description) throws HttpException, MRApiException, IOException {
+ try {
+ final JSONObject o = new JSONObject();
+ o.put(PARAM_EMAIL, email);
+ o.put(PARAM_DESCRIPTION, description);
+ final JSONObject reply = post(BASE_URI_APIKEY + "/create", o, true);
+ return new ApiCredential(reply.getString("key"), reply.getString("secret"));
+ } catch (JSONException e) {
+ // the response doesn't meet our expectation
+ throw new MRApiException("The API key response is incomplete.", e);
+ }
+ }
+
+ @Override
+ public ApiKey getApiKey(String apiKey) throws HttpObjectNotFoundException, HttpException, IOException {
+ final JSONObject keyEntry = get(BASE_URI_APIKEY + "/" + MRConstants.escape(apiKey));
+ if (keyEntry == null) {
+ return null;
+ }
+
+ return new ApiKey() {
+ @Override
+ public String getEmail() {
+ final JSONObject aux = keyEntry.optJSONObject("aux");
+ if (aux != null) {
+ return aux.optString(PARAM_EMAIL);
+ }
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ final JSONObject aux = keyEntry.optJSONObject("aux");
+ if (aux != null) {
+ return aux.optString(PARAM_DESCRIPTION);
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void updateCurrentApiKey(String email, String description) throws HttpObjectNotFoundException, HttpException, IOException {
+ final JSONObject o = new JSONObject();
+ if (email != null) {
+ o.put(PARAM_EMAIL, email);
+ }
+ if (description != null) {
+ o.put(PARAM_DESCRIPTION, description);
+ }
+ patch(BASE_URI_APIKEY + "/" + MRConstants.escape(getCurrentApiKey()), o);
+ }
+
+ @Override
+ public void deleteCurrentApiKey() throws HttpException, IOException {
+ delete(BASE_URI_APIKEY + "/" + MRConstants.escape(getCurrentApiKey()));
+ }
}
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
index bd140cd..74fec8a 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
@@ -4,11 +4,13 @@
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2021 Orange.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +19,13 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
+
package org.onap.dmaap.mr.client.impl;
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -33,342 +38,353 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
-
import javax.ws.rs.core.MultivaluedMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.http.HttpException;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONTokener;
-
-import com.att.aft.dme2.api.DME2Client;
-import com.att.aft.dme2.api.DME2Exception;
import org.onap.dmaap.mr.client.HostSelector;
import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.ProtocolType;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
- private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
-
- private static final String PROPS_PROTOCOL = "Protocol";
- private static final String PROPS_PARTITION = "partition";
- private static final String PROPS_CONTENT_TYPE = "contenttype";
-
- private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
- private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
- private static final String CONTENT_TYPE_JSON = "application/json";
- private static final String CONTENT_TYPE_TEXT = "text/plain";
-
- private static final String JSON_STATUS = "status";
-
- public static class Builder {
-
- public Builder againstUrls(Collection<String> baseUrls) {
- fUrls = baseUrls;
- return this;
- }
-
- public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
- {
- fUrls = baseUrls;
- fServiceName = serviceName;
- fTransportype = transportype;
- return this;
- }
-
- public Builder onTopic(String topic) {
- fTopic = topic;
- return this;
- }
-
- public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- return this;
- }
-
- public Builder compress(boolean compress) {
- fCompress = compress;
- return this;
- }
-
- public Builder httpThreadTime(int threadOccuranceTime) {
- this.threadOccuranceTime = threadOccuranceTime;
- return this;
- }
-
- public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
- fAllowSelfSignedCerts = allowSelfSignedCerts;
- return this;
- }
-
- public Builder withResponse(boolean withResponse) {
- fWithResponse = withResponse;
- return this;
- }
-
- public MRSimplerBatchPublisher build() {
- if (!fWithResponse) {
- try {
- return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
- fAllowSelfSignedCerts, threadOccuranceTime);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- } else {
- try {
- return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
- fAllowSelfSignedCerts, fMaxBatchSize);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- }
-
- private Collection<String> fUrls;
- private Collection<String> fServiceName;
- private String fTransportype;
- private String fTopic;
- private int fMaxBatchSize = 100;
-
- private long fMaxBatchAgeMs = 1000;
- private boolean fCompress = false;
- private int threadOccuranceTime = 50;
- private boolean fAllowSelfSignedCerts = false;
- private boolean fWithResponse = false;
-
- };
-
- @Override
- public int send(String partition, String msg) {
- return send(new message(partition, msg));
- }
-
- @Override
- public int send(String msg) {
- return send(new message(null, msg));
- }
-
- @Override
- public int send(message msg) {
- final LinkedList<message> list = new LinkedList<>();
- list.add(msg);
- return send(list);
- }
-
- @Override
- public synchronized int send(Collection<message> msgs) {
- if (fClosed) {
- throw new IllegalStateException("The publisher was closed.");
- }
-
- for (message userMsg : msgs) {
- fPending.add(new TimestampedMessage(userMsg));
- }
- return getPendingMessageCount();
- }
-
- @Override
- public synchronized int getPendingMessageCount() {
- return fPending.size();
- }
-
- @Override
- public void close() {
- try {
- final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.isEmpty()) {
- getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
- remains.size());
- }
- } catch (InterruptedException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- }
- }
-
- @Override
- public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
- synchronized (this) {
- fClosed = true;
-
- // stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- fExec.shutdown();
- }
-
- final long now = Clock.now();
- final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
- final long timeoutAtMs = now + waitInMs;
-
- while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
- send(true);
- Thread.sleep(250);
- }
-
- synchronized (this) {
- final LinkedList<message> result = new LinkedList<>();
- fPending.drainTo(result);
- return result;
- }
- }
-
- /**
- * Possibly send a batch to the MR server. This is called by the background
- * thread and the close() method
- *
- * @param force
- */
- private synchronized void send(boolean force) {
- if ((force || shouldSendNow()) && !sendBatch()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
- }
-
- private synchronized boolean shouldSendNow() {
- boolean shouldSend = false;
- if (fPending.size()>0) {
- final long nowMs = Clock.now();
-
- shouldSend = (fPending.size() >= fMaxBatchSize);
- if (!shouldSend) {
- final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
- }
- return shouldSend;
- }
-
- /**
- * Method to parse published JSON Objects and Arrays
- *
- * @return JSONArray
- */
- private JSONArray parseJSON() {
- JSONArray jsonArray = new JSONArray();
- for (TimestampedMessage m : fPending) {
- JSONTokener jsonTokener = new JSONTokener(m.fMsg);
- JSONObject jsonObject = null;
- JSONArray tempjsonArray = null;
- final char firstChar = jsonTokener.next();
- jsonTokener.back();
- if ('[' == firstChar) {
- tempjsonArray = new JSONArray(jsonTokener);
- if (null != tempjsonArray) {
- for (int i = 0; i < tempjsonArray.length(); i++) {
- jsonArray.put(tempjsonArray.getJSONObject(i));
- }
- }
- } else {
- jsonObject = new JSONObject(jsonTokener);
- jsonArray.put(jsonObject);
- }
-
- }
- return jsonArray;
- }
-
- private void logTime(long startMs, String dmeResponse) {
- if (getLog().isInfoEnabled()) {
- getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
- }
- }
-
- private synchronized boolean sendBatch() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.isEmpty()) {
- return true;
- }
-
- final long nowMs = Clock.now();
-
- if (this.fHostSelector != null) {
- host = this.fHostSelector.selectBaseHost();
- }
-
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
- props.getProperty(PROPS_PARTITION));
-
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- OutputStream os = baseStream;
- final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
- if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
- JSONArray jsonArray = parseJSON();
- os.write(jsonArray.toString().getBytes());
- os.close();
-
- } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
- || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
- if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
-
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
-
- }
- os.close();
- }
-
- final long startMs = Clock.now();
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
- DME2Configue();
-
- this.wait(5);
- getLog().info(String
- .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
- nowMs - fPending.peek().timestamp));
- sender.setPayload(os.toString());
- String dmeResponse = sender.sendAndWait(5000L);
-
- logTime(startMs, dmeResponse);
- fPending.clear();
- return true;
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
+ private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+
+ private static final String PASSWORD = "password";
+ private static final String USERNAME = "username";
+ private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
+ private static final String SERVICE_NAME = "ServiceName";
+ private static final String PARTNER = "Partner";
+ private static final String ROUTE_OFFER = "routeOffer";
+ private static final String PROTOCOL = "Protocol";
+ private static final String METHOD_TYPE = "MethodType";
+ private static final String CONTENT_TYPE = "contenttype";
+ private static final String LATITUDE = "Latitude";
+ private static final String LONGITUDE = "Longitude";
+ private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
+ private static final String VERSION = "Version";
+ private static final String ENVIRONMENT = "Environment";
+ private static final String SUB_CONTEXT_PATH = "SubContextPath";
+ private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
+ private static final String PARTITION = "partition";
+ private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
+ private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
+ private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
+ private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
+
+ private static final String CONTENT_TYPE_TEXT = "text/plain";
+
+ private static final String JSON_STATUS = "status";
+
+ public static class Builder {
+
+ public Builder againstUrls(Collection<String> baseUrls) {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ public Builder onTopic(String topic) {
+ fTopic = topic;
+ return this;
+ }
+
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ public Builder compress(boolean compress) {
+ fCompress = compress;
+ return this;
+ }
+
+ public Builder httpThreadTime(int threadOccurrenceTime) {
+ this.threadOccurrenceTime = threadOccurrenceTime;
+ return this;
+ }
+
+ public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
+ fAllowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public Builder withResponse(boolean withResponse) {
+ fWithResponse = withResponse;
+ return this;
+ }
+
+ public MRSimplerBatchPublisher build() {
+ if (!fWithResponse) {
+ try {
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, threadOccurrenceTime);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ } else {
+ try {
+ return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+ fAllowSelfSignedCerts, fMaxBatchSize);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ }
+
+ private Collection<String> fUrls;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ private int threadOccurrenceTime = 50;
+ private boolean fAllowSelfSignedCerts = false;
+ private boolean fWithResponse = false;
+
+ }
+
+ @Override
+ public int send(String partition, String msg) {
+ return send(new Message(partition, msg));
+ }
+
+ @Override
+ public int send(String msg) {
+ return send(new Message(null, msg));
+ }
+
+ @Override
+ public int send(Message msg) {
+ final LinkedList<Message> list = new LinkedList<>();
+ list.add(msg);
+ return send(list);
+ }
+
+ @Override
+ public synchronized int send(Collection<Message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
+ }
+
+ for (Message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
+ }
+ return getPendingMessageCount();
+ }
+
+ @Override
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
+ }
+
+ @Override
+ public void close() {
+ try {
+ final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.isEmpty()) {
+ getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
+ remains.size());
+ }
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
+ }
+
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ final long timeoutAtMs = now + waitInMs;
+
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
+ }
+
+ synchronized (this) {
+ final LinkedList<Message> result = new LinkedList<>();
+ fPending.drainTo(result);
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the MR server. This is called by the background
+ * thread and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send(boolean force) {
+ if ((force || shouldSendNow()) && !sendBatch()) {
+ getLog().warn("Send failed, {} message to send.", fPending.size());
+ // note the time for back-off
+ fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now();
+ }
+ }
+
+ private synchronized boolean shouldSendNow() {
+ boolean shouldSend = false;
+ if (!fPending.isEmpty()) {
+ final long nowMs = Clock.now();
+
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ /**
+ * Method to parse published JSON Objects and Arrays.
+ *
+ * @return JSONArray
+ */
+ private JSONArray parseJSON() {
+ JSONArray jsonArray = new JSONArray();
+ for (TimestampedMessage m : fPending) {
+ JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+ JSONObject jsonObject = null;
+ JSONArray tempjsonArray = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ tempjsonArray = new JSONArray(jsonTokener);
+ for (int i = 0; i < tempjsonArray.length(); i++) {
+ jsonArray.put(tempjsonArray.getJSONObject(i));
+ }
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonArray.put(jsonObject);
+ }
+
+ }
+ return jsonArray;
+ }
+
+ private void logTime(long startMs, String dmeResponse) {
+ if (getLog().isInfoEnabled()) {
+ getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse);
+ }
+ }
+
+ private void logSendMessage(int nbMessage, String dest, long time) {
+ if (getLog().isInfoEnabled()) {
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time);
+ }
+ }
+
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.isEmpty()) {
+ return true;
+ }
+
+ final long nowMs = Clock.now();
+
+ if (this.fHostSelector != null) {
+ host = this.fHostSelector.selectBaseHost();
+ }
+
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
+ props.getProperty(PARTITION));
+
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ OutputStream os = baseStream;
+ final String contentType = props.getProperty(CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ os.close();
+
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
+ || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
+ if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+
+ }
+ os.close();
+ }
+
+ final long startMs = Clock.now();
+ if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+ configureDME2();
+
+ this.wait(5);
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
+ }
+ sender.setPayload(os.toString());
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ logTime(startMs, dmeResponse);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
+ }
final JSONObject result =
postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
.setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
@@ -379,550 +395,563 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- logTime(startMs, result.toString());
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
- final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
- protocolFlag);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
- return false;
- }
- logTime(startMs, result.toString());
- fPending.clear();
- return true;
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
- final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
- return false;
- }
- logTime(startMs, result.toString());
- fPending.clear();
- return true;
- }
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
- }
- return false;
- }
-
- public synchronized MRPublisherResponse sendBatchWithResponse() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.isEmpty()) {
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage("No Messages to send");
- return pubResponse;
- }
-
- final long nowMs = Clock.now();
-
- host = this.fHostSelector.selectBaseHost();
-
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
- props.getProperty(PROPS_PARTITION));
- OutputStream os = null;
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- os = baseStream;
- final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
- if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
- JSONArray jsonArray = parseJSON();
- os.write(jsonArray.toString().getBytes());
- } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
- || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
- if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
-
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
- } else {
- for (TimestampedMessage m : fPending) {
- os.write(m.fMsg.getBytes());
-
- }
- }
-
- final long startMs = Clock.now();
- if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-
- try {
- DME2Configue();
-
- this.wait(5);
- getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
- nowMs - fPending.peek().timestamp);
- sender.setPayload(os.toString());
-
- String dmeResponse = sender.sendAndWait(5000L);
-
- pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
- final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
- getLog().info(logLine);
- fPending.clear();
-
- } catch (DME2Exception x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(x.getErrorCode());
- pubResponse.setResponseMessage(x.getErrorMessage());
- } catch (URISyntaxException x) {
-
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
- } catch (Exception x) {
-
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
- logger.error("exception: ", x);
-
- }
-
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
- final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
- authDate, username, password, protocolFlag);
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
-
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- logTime(startMs, result);
- fPending.clear();
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
- final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
- password, protocolFlag);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- final String logLine = String.valueOf((Clock.now() - startMs));
- getLog().info(logLine);
- fPending.clear();
- return pubResponse;
- }
-
- if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
- nowMs - fPending.peek().timestamp);
- final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
-
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- pubResponse = createMRPublisherResponse(result, pubResponse);
-
- if (Integer.valueOf(pubResponse.getResponseCode()) < 200
- || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-
- return pubResponse;
- }
-
- final String logLine = String.valueOf((Clock.now() - startMs));
- getLog().info(logLine);
- fPending.clear();
- return pubResponse;
- }
- } catch (IllegalArgumentException | HttpException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
-
- } catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
-
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage(x.getMessage());
-
- }
-
- finally {
- if (!fPending.isEmpty()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
- pubResponse.setPendingMsgs(fPending.size());
- }
- if (os != null) {
- try {
- os.close();
- } catch (Exception x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
- pubResponse.setResponseMessage("Error in closing Output Stream");
- }
- }
- }
-
- return pubResponse;
- }
-
- public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-
- if (reply.isEmpty()) {
-
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- mrPubResponse.setResponseMessage("Please verify the Producer properties");
- } else if (reply.startsWith("{")) {
- JSONObject jObject = new JSONObject(reply);
- if (jObject.has("message") && jObject.has(JSON_STATUS)) {
- String message = jObject.getString("message");
- if (null != message) {
- mrPubResponse.setResponseMessage(message);
- }
- mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
- } else {
- mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
- mrPubResponse.setResponseMessage(reply);
- }
- } else if (reply.startsWith("<")) {
- String responseCode = getHTTPErrorResponseCode(reply);
- if (responseCode.contains("403")) {
- responseCode = "403";
- }
- mrPubResponse.setResponseCode(responseCode);
- mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
- }
-
- return mrPubResponse;
- }
-
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxBatchAgeMs;
- private final boolean fCompress;
- private int threadOccuranceTime;
- private boolean fClosed;
- private String username;
- private String password;
- private String host;
-
- // host selector
- private HostSelector fHostSelector = null;
-
- private final LinkedBlockingQueue<TimestampedMessage> fPending;
- private long fDontSendUntilMs;
- private final ScheduledThreadPoolExecutor fExec;
-
- private String latitude;
- private String longitude;
- private String version;
- private String serviceName;
- private String env;
- private String partner;
- private String routeOffer;
- private String subContextPath;
- private String protocol;
- private String methodType;
- private String url;
- private String dmeuser;
- private String dmepassword;
- private String contentType;
- private static final long sfWaitAfterError = 10000;
- private HashMap<String, String> DMETimeOuts;
- private DME2Client sender;
- public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
- private String authKey;
- private String authDate;
- private String handlers;
- private Properties props;
- public static String routerFilePath;
- protected static final Map<String, String> headers = new HashMap<String, String>();
- public static MultivaluedMap<String, Object> headersMap;
-
- private MRPublisherResponse pubResponse;
-
- public MRPublisherResponse getPubResponse() {
- return pubResponse;
- }
-
- public void setPubResponse(MRPublisherResponse pubResponse) {
- this.pubResponse = pubResponse;
- }
-
- public static String getRouterFilePath() {
- return routerFilePath;
- }
-
- public static void setRouterFilePath(String routerFilePath) {
- MRSimplerBatchPublisher.routerFilePath = routerFilePath;
- }
-
- public Properties getProps() {
- return props;
- }
-
- public void setProps(Properties props) {
- this.props = props;
- setClientConfig(DmaapClientUtil.getClientConfig(props));
- }
-
- public String getProtocolFlag() {
- return protocolFlag;
- }
-
- public void setProtocolFlag(String protocolFlag) {
- this.protocolFlag = protocolFlag;
- }
-
- private void DME2Configue() throws Exception {
- try {
-
- latitude = props.getProperty("Latitude");
- longitude = props.getProperty("Longitude");
- version = props.getProperty("Version");
- serviceName = props.getProperty("ServiceName");
- env = props.getProperty("Environment");
- partner = props.getProperty("Partner");
- routeOffer = props.getProperty("routeOffer");
- subContextPath = props.getProperty("SubContextPath") + fTopic;
-
- protocol = props.getProperty(PROPS_PROTOCOL);
- methodType = props.getProperty("MethodType");
- dmeuser = props.getProperty("username");
- dmepassword = props.getProperty("password");
- contentType = props.getProperty(PROPS_CONTENT_TYPE);
- handlers = props.getProperty("sessionstickinessrequired");
- routerFilePath = props.getProperty("DME2preferredRouterFilePath");
-
- /**
- * Changes to DME2Client url to use Partner for auto failover
- * between data centers When Partner value is not provided use the
- * routeOffer value for auto failover within a cluster
- */
-
- String partitionKey = props.getProperty(PROPS_PARTITION);
-
- if (partner != null && !partner.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
- + partner;
- if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
- url = url + "&partitionKey=" + partitionKey;
- }
- } else if (routeOffer != null && !routeOffer.isEmpty()) {
- url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
- + routeOffer;
- if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
- url = url + "&partitionKey=" + partitionKey;
- }
- }
-
- DMETimeOuts = new HashMap<String, String>();
- DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
- DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
- DMETimeOuts.put("Content-Type", contentType);
- System.setProperty("AFT_LATITUDE", latitude);
- System.setProperty("AFT_LONGITUDE", longitude);
- System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
- // System.setProperty("DME2.DEBUG", "true");
-
- // SSL changes
- // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-
- System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
- System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
- System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-
- // SSL changes
-
- sender = new DME2Client(new URI(url), 5000L);
-
- sender.setAllowAllHttpReturnCodes(true);
- sender.setMethod(methodType);
- sender.setSubContext(subContextPath);
- sender.setCredentials(dmeuser, dmepassword);
- sender.setHeaders(DMETimeOuts);
- if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
- sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
- props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
- sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
- } else {
- sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
- }
- } catch (DME2Exception x) {
- getLog().warn(x.getMessage(), x);
- throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
- } catch (URISyntaxException x) {
-
- getLog().warn(x.getMessage(), x);
- throw new URISyntaxException(url, x.getMessage());
- } catch (Exception x) {
-
- getLog().warn(x.getMessage(), x);
- throw new IllegalArgumentException(x.getMessage());
- }
- }
-
- private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
- boolean compress) throws MalformedURLException {
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fHostSelector = new HostSelector(hosts, null);
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
-
- fPending = new LinkedBlockingQueue<>();
- fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor(1);
- pubResponse = new MRPublisherResponse();
-
- }
-
- private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
- boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fHostSelector = new HostSelector(hosts, null);
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
- threadOccuranceTime = httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<>();
- fDontSendUntilMs = 0;
- fExec = new ScheduledThreadPoolExecutor(1);
- fExec.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- send(false);
- }
- }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
- pubResponse = new MRPublisherResponse();
- }
-
- private static class TimestampedMessage extends message {
- public TimestampedMessage(message m) {
- super(m);
- timestamp = Clock.now();
- }
-
- public final long timestamp;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getContentType() {
- return contentType;
- }
-
- public void setContentType(String contentType) {
- this.contentType = contentType;
- }
-
- public String getAuthKey() {
- return authKey;
- }
-
- public void setAuthKey(String authKey) {
- this.authKey = authKey;
- }
-
- public String getAuthDate() {
- return authDate;
- }
-
- public void setAuthDate(String authDate) {
- this.authDate = authDate;
- }
+ if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
+ }
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+ protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
+ return false;
+ }
+ logTime(startMs, result.toString());
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
+ }
+ final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
+ return false;
+ }
+ logTime(startMs, result.toString());
+ fPending.clear();
+ return true;
+ }
+ } catch (InterruptedException e) {
+ getLog().warn("Interrupted!", e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ public synchronized MRPublisherResponse sendBatchWithResponse() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.isEmpty()) {
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage("No Messages to send");
+ return pubResponse;
+ }
+
+ final long nowMs = Clock.now();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
+ props.getProperty(PARTITION));
+ OutputStream os = null;
+ try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) {
+ os = baseStream;
+ final String propsContentType = props.getProperty(CONTENT_TYPE);
+ if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
+ JSONArray jsonArray = parseJSON();
+ os.write(jsonArray.toString().getBytes());
+ } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
+ || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
+ if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+ } else {
+ for (TimestampedMessage m : fPending) {
+ os.write(m.fMsg.getBytes());
+
+ }
+ }
+
+ final long startMs = Clock.now();
+ if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+ try {
+ configureDME2();
+
+ this.wait(5);
+
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
+ }
+ sender.setPayload(os.toString());
+
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+ if (Integer.parseInt(pubResponse.getResponseCode()) < 200
+ || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(x.getErrorCode());
+ pubResponse.setResponseMessage(x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception x) {
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+ logger.error("exception: ", x);
+
+ }
+
+ return pubResponse;
+ }
+
+ if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
+ }
+ final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey,
+ authDate, username, password, protocolFlag);
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.parseInt(pubResponse.getResponseCode()) < 200
+ || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ logTime(startMs, result);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
+ }
+ final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username,
+ password, protocolFlag);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.parseInt(pubResponse.getResponseCode()) < 200
+ || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ if (fPending.peek() != null) {
+ logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
+ }
+ final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.parseInt(pubResponse.getResponseCode()) < 200
+ || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+ } catch (IllegalArgumentException | HttpException x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+ } catch (InterruptedException e) {
+ getLog().warn("Interrupted!", e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } finally {
+ if (!fPending.isEmpty()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
+ pubResponse.setPendingMsgs(fPending.size());
+ }
+ if (os != null) {
+ try {
+ os.close();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage("Error in closing Output Stream");
+ }
+ }
+ }
+
+ return pubResponse;
+ }
+
+ public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty()) {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ } else if (reply.startsWith("{")) {
+ JSONObject jObject = new JSONObject(reply);
+ if (jObject.has("message") && jObject.has(JSON_STATUS)) {
+ String message = jObject.getString("message");
+ if (null != message) {
+ mrPubResponse.setResponseMessage(message);
+ }
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
+ } else {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
+ }
+ } else if (reply.startsWith("<")) {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if (responseCode.contains("403")) {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
+ return mrPubResponse;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private int threadOccurrenceTime;
+ private boolean fClosed;
+ private String username;
+ private String password;
+ private String host;
+
+ // host selector
+ private HostSelector fHostSelector = null;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contentType;
+ private static final long SF_WAIT_AFTER_ERROR = 10000;
+ private HashMap<String, String> DMETimeOuts;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolType.DME2.getValue();
+ private String authKey;
+ private String authDate;
+ private String handlers;
+ private Properties props;
+ public static String routerFilePath;
+ protected static final Map<String, String> headers = new HashMap<String, String>();
+ public static MultivaluedMap<String, Object> headersMap;
+
+ private MRPublisherResponse pubResponse;
+
+ public MRPublisherResponse getPubResponse() {
+ return pubResponse;
+ }
+
+ public void setPubResponse(MRPublisherResponse pubResponse) {
+ this.pubResponse = pubResponse;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ setClientConfig(DmaapClientUtil.getClientConfig(props));
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ private void configureDME2() throws Exception {
+ try {
+
+ latitude = props.getProperty(LATITUDE);
+ longitude = props.getProperty(LONGITUDE);
+ version = props.getProperty(VERSION);
+ serviceName = props.getProperty(SERVICE_NAME);
+ env = props.getProperty(ENVIRONMENT);
+ partner = props.getProperty(PARTNER);
+ routeOffer = props.getProperty(ROUTE_OFFER);
+ subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic;
+
+ protocol = props.getProperty(PROTOCOL);
+ methodType = props.getProperty(METHOD_TYPE);
+ dmeuser = props.getProperty(USERNAME);
+ dmepassword = props.getProperty(PASSWORD);
+ contentType = props.getProperty(CONTENT_TYPE);
+ handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
+
+ MRSimplerBatchPublisher.routerFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
+
+ /*
+ * Changes to DME2Client url to use Partner for auto failover
+ * between data centers When Partner value is not provided use the
+ * routeOffer value for auto failover within a cluster
+ */
+
+ String partitionKey = props.getProperty(PARTITION);
+
+ if (partner != null && !partner.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+ + partner;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ } else if (routeOffer != null && !routeOffer.isEmpty()) {
+ url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+ + routeOffer;
+ if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+
+ DMETimeOuts = new HashMap<>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
+ DMETimeOuts.put("Content-Type", contentType);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ // SSL changes
+ // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+
+ // SSL changes
+
+ sender = new DME2Client(new URI(url), 5000L);
+
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ sender.setCredentials(dmeuser, dmepassword);
+ sender.setHeaders(DMETimeOuts);
+ if ("yes".equalsIgnoreCase(handlers)) {
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+ props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+ props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
+ } else {
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new URISyntaxException(url, x.getMessage());
+ } catch (Exception x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new IllegalArgumentException(x.getMessage());
+ }
+ }
+
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<>();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor(1);
+ pubResponse = new MRPublisherResponse();
+
+ }
+
+ private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+ boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException {
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+ threadOccurrenceTime = httpThreadOccurrence;
+ fPending = new LinkedBlockingQueue<>();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ send(false);
+ }
+ }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS);
+ pubResponse = new MRPublisherResponse();
+ }
+
+ private static class TimestampedMessage extends Message {
+ public TimestampedMessage(Message message) {
+ super(message);
+ timestamp = Clock.now();
+ }
+
+ public final long timestamp;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
}