diff options
Diffstat (limited to 'src/main')
12 files changed, 291 insertions, 356 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java index 8936bea..91e10e0 100644 --- a/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java +++ b/src/main/java/org/onap/dmaap/mr/client/MRClientBuilders.java @@ -42,6 +42,7 @@ import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; */ public class MRClientBuilders { + private final static String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name."; /** * Instantiates MRClientBuilders. @@ -56,10 +57,6 @@ public class MRClientBuilders */ public static class ConsumerBuilder { - /** - * Construct a consumer builder. - */ - public ConsumerBuilder () {} /** * Set the host list @@ -151,7 +148,7 @@ public class MRClientBuilders { if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); } if ( fGroup == null ) @@ -194,7 +191,6 @@ public class MRClientBuilders */ public static class PublisherBuilder { - public PublisherBuilder () {} /** * Set the MR/UEB host(s) to use @@ -298,7 +294,7 @@ public class MRClientBuilders { if ( fHosts == null || fHosts.isEmpty() || fTopic == null ) { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); } if ( sfPublisherMock != null ) return sfPublisherMock; @@ -331,10 +327,6 @@ public class MRClientBuilders */ public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> { - /** - * Construct an identity manager builder. - */ - public IdentityManagerBuilder () {} @Override protected MRIdentityManager constructClient ( Collection<String> hosts ) { try { @@ -350,10 +342,6 @@ public class MRClientBuilders */ public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> { - /** - * Construct an topic manager builder. - */ - public TopicManagerBuilder () {} @Override protected MRTopicManager constructClient ( Collection<String> hosts ) { try { @@ -434,7 +422,7 @@ public class MRClientBuilders { if ( fHosts.isEmpty() ) { - throw new IllegalArgumentException ( "You must provide at least one host and a topic name." ); + throw new IllegalArgumentException ( ILLEGAL_ARGUMENT_MESSAGE ); } final T mgr = constructClient ( fHosts ); diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java index e5ea48e..1780703 100644 --- a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java +++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java @@ -23,11 +23,7 @@ *******************************************************************************/ package org.onap.dmaap.mr.client; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; +import java.io.*; import java.net.MalformedURLException; import java.util.Collection; import java.util.Map; @@ -62,10 +58,13 @@ public class MRClientFactory { private static final String AUTH_DATE = "authDate"; private static final String PASSWORD = "password"; private static final String USERNAME = "username"; + private static final String FILTER = "filter"; + private static final String HOST = "host"; private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath"; private static final String TOPIC = "topic"; private static final String TRANSPORT_TYPE = "TransportType"; - public static MultivaluedMap<String, Object> HTTPHeadersMap; + + private static MultivaluedMap<String, Object> httpHeadersMap; public static Map<String, String> DME2HeadersMap; public static String routeFilePath; @@ -80,7 +79,23 @@ public class MRClientFactory { private MRClientFactory() { //prevents instantiation. } - + + /** + * Add getter to avoid direct access to static header map. + * @return + */ + public static MultivaluedMap<String, Object> getHTTPHeadersMap() { + return httpHeadersMap; + } + + /** + * Add setter to avoid direct access to static header map. + * @param headers + */ + public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) { + httpHeadersMap = headers; + } + /** * Create a consumer instance with the default timeout and no limit on * messages returned. This consumer operates as an independent consumer @@ -400,8 +415,6 @@ public class MRClientFactory { * use gzip compression * @param protocolFlag * http auth or ueb auth or dme2 method - * @param producerFilePath - * all properties for publisher * @return MRBatchingPublisher obj */ public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username, @@ -421,7 +434,7 @@ public class MRClientFactory { * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown * - * @param Properties + * @param props * props set all properties for publishing message * @return MRBatchingPublisher obj * @throws FileNotFoundException @@ -438,7 +451,7 @@ public class MRClientFactory { * Create a publisher that batches messages. Be sure to close the publisher * to send the last batch and ensure a clean shutdown * - * @param Properties + * @param props * props set all properties for publishing message * @return MRBatchingPublisher obj * @throws FileNotFoundException @@ -465,9 +478,10 @@ public class MRClientFactory { */ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath) throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); Properties props = new Properties(); - props.load(reader); + try(InputStream input = new FileInputStream(producerFilePath)) { + props.load(input); + } return createBatchingPublisher(props); } @@ -485,9 +499,10 @@ public class MRClientFactory { */ public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse) throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(producerFilePath)); Properties props = new Properties(); - props.load(reader); + try(InputStream input = new FileInputStream(producerFilePath)) { + props.load(input); + } return createBatchingPublisher(props, withResponse); } @@ -497,7 +512,7 @@ public class MRClientFactory { MRSimplerBatchPublisher pub; if (withResponse) { pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) .onTopic(props.getProperty(TOPIC)) .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())) @@ -506,14 +521,14 @@ public class MRClientFactory { .withResponse(withResponse).build(); } else { pub = new MRSimplerBatchPublisher.Builder() - .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE)) .onTopic(props.getProperty(TOPIC)) .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())) .compress(Boolean.parseBoolean(props.getProperty("compress"))) .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build(); } - pub.setHost(props.getProperty("host")); + pub.setHost(props.getProperty(HOST)); if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { pub.setAuthKey(props.getProperty(AUTH_KEY)); @@ -638,10 +653,10 @@ public class MRClientFactory { } public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException { - FileReader reader = new FileReader(new File(consumerFilePath)); Properties props = new Properties(); - props.load(reader); - + try(InputStream input = new FileInputStream(consumerFilePath)) { + props.load(input); + } return createConsumer(props); } @@ -665,10 +680,10 @@ public class MRClientFactory { MRConsumerImpl sub = null; if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) { sub = new MRConsumerImpl.MRConsumerImplBuilder() - .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host"))) + .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST))) .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group) .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit) - .setFilter(props.getProperty("filter")) + .setFilter(props.getProperty(FILTER)) .setApiKey_username(props.getProperty(AUTH_KEY)) .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl(); sub.setAuthKey(props.getProperty(AUTH_KEY)); @@ -677,10 +692,10 @@ public class MRClientFactory { sub.setPassword(props.getProperty(PASSWORD)); } else { sub = new MRConsumerImpl.MRConsumerImplBuilder() - .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host"))) + .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST))) .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group) .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit) - .setFilter(props.getProperty("filter")) + .setFilter(props.getProperty(FILTER)) .setApiKey_username(props.getProperty(USERNAME)) .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl(); sub.setUsername(props.getProperty(USERNAME)); @@ -688,9 +703,9 @@ public class MRClientFactory { } sub.setProps(props); - sub.setHost(props.getProperty("host")); + sub.setHost(props.getProperty(HOST)); sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE)); - sub.setfFilter(props.getProperty("filter")); + sub.setfFilter(props.getProperty(FILTER)); if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH)); routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH); diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java index b29c100..07cf6a7 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBaseClient.java @@ -48,10 +48,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public class MRBaseClient extends HttpClient implements MRClient { + private final static String HEADER_TRANSACTION_ID = "transactionid"; + + private final static String JSON_RESULT = "result"; + private final static String JSON_STATUS = "status"; + + private final static String AUTH_FAILED = "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."; + private final static String LOG_TRANSACTION_ID = "TransactionId : "; + private ClientConfig clientConfig = null; protected MRBaseClient(Collection<String> hosts) throws MalformedURLException { - super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort); + super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT); fLog = LoggerFactory.getLogger(this.getClass().getName()); } @@ -63,7 +71,7 @@ public class MRBaseClient extends HttpClient implements MRClient { } protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException { - super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, + super(ConnectionType.HTTP, hosts, MRConstants.STD_MR_SERVICE_PORT, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); fLog = LoggerFactory.getLogger(this.getClass().getName()); @@ -79,6 +87,7 @@ public class MRBaseClient extends HttpClient implements MRClient { @Override public void close() { + // nothing to close } protected Set<String> jsonArrayToSet(JSONArray a) { @@ -115,8 +124,7 @@ public class MRBaseClient extends HttpClient implements MRClient { return getResponseDataInJson(response); } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -149,8 +157,7 @@ public class MRBaseClient extends HttpClient implements MRClient { responseData = (String) response.readEntity(String.class); return responseData; } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -180,8 +187,7 @@ public class MRBaseClient extends HttpClient implements MRClient { postAuthDO.getAuthDate(), postAuthDO.getData(), postAuthDO.getContentType()); return getResponseDataInJson(response); } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -198,8 +204,7 @@ public class MRBaseClient extends HttpClient implements MRClient { return responseData; } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -222,8 +227,7 @@ public class MRBaseClient extends HttpClient implements MRClient { } return getResponseDataInJson(response); } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -241,18 +245,17 @@ public class MRBaseClient extends HttpClient implements MRClient { String encoding = Base64.encodeAsString(username + ":" + password); response = DmaapClientUtil.getResponsewtBasicAuth(target, encoding); } - MRClientFactory.HTTPHeadersMap = response.getHeaders(); + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - String transactionid = response.getHeaderString("transactionid"); + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); + fLog.info(LOG_TRANSACTION_ID + transactionid); } responseData = (String) response.readEntity(String.class); return responseData; } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -266,8 +269,7 @@ public class MRBaseClient extends HttpClient implements MRClient { return getResponseDataInJson(response); } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -290,18 +292,17 @@ public class MRBaseClient extends HttpClient implements MRClient { target = DmaapClientUtil.getTarget(clientConfig,path, username, password); response = DmaapClientUtil.getResponsewtCambriaAuth(target, authKey, authDate); - MRClientFactory.HTTPHeadersMap = response.getHeaders(); + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - String transactionid = response.getHeaderString("transactionid"); + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); + fLog.info(LOG_TRANSACTION_ID + transactionid); } responseData = (String) response.readEntity(String.class); return responseData; } else { - throw new HttpException( - "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty."); + throw new HttpException(AUTH_FAILED); } } @@ -313,11 +314,11 @@ public class MRBaseClient extends HttpClient implements MRClient { target = DmaapClientUtil.getTarget(clientConfig,path, username, password); response = DmaapClientUtil.getResponsewtNoAuth(target); - MRClientFactory.HTTPHeadersMap = response.getHeaders(); + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); - String transactionid = response.getHeaderString("transactionid"); + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); + fLog.info(LOG_TRANSACTION_ID + transactionid); } responseData = (String) response.readEntity(String.class); @@ -327,13 +328,13 @@ public class MRBaseClient extends HttpClient implements MRClient { private JSONObject getResponseDataInJson(Response response) throws JSONException { try { - MRClientFactory.HTTPHeadersMap = response.getHeaders(); + MRClientFactory.setHTTPHeadersMap(response.getHeaders()); // MultivaluedMap<String, Object> headersMap = // for(String key : headersMap.keySet()) { - String transactionid = response.getHeaderString("transactionid"); + String transactionid = response.getHeaderString(HEADER_TRANSACTION_ID); if (transactionid != null && !transactionid.equalsIgnoreCase("")) { - fLog.info("TransactionId : " + transactionid); + fLog.info(LOG_TRANSACTION_ID + transactionid); } if (response.getStatus() == 403) { @@ -341,8 +342,8 @@ public class MRBaseClient extends HttpClient implements MRClient { jsonObject = new JSONObject(); JSONArray jsonArray = new JSONArray(); jsonArray.put(response.getEntity()); - jsonObject.put("result", jsonArray); - jsonObject.put("status", response.getStatus()); + jsonObject.put(JSON_RESULT, jsonArray); + jsonObject.put(JSON_STATUS, response.getStatus()); return jsonObject; } String responseData = (String) response.readEntity(String.class); @@ -354,11 +355,11 @@ public class MRBaseClient extends HttpClient implements MRClient { if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); - jsonObject.put("status", response.getStatus()); + jsonObject.put(JSON_RESULT, jsonArray); + jsonObject.put(JSON_STATUS, response.getStatus()); } else { jsonObject = new JSONObject(jsonTokener); - jsonObject.put("status", response.getStatus()); + jsonObject.put(JSON_STATUS, response.getStatus()); } return jsonObject; diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java index bcd4403..5c7259c 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRBatchPublisher.java @@ -54,7 +54,7 @@ import org.onap.dmaap.mr.client.response.MRPublisherResponse; @Deprecated public class MRBatchPublisher implements MRBatchingPublisher { - public static final long kMinMaxAgeMs = 1; + public static final long MIN_MAX_AGE_MS = 1; /** * Create a batch publisher. @@ -66,10 +66,10 @@ public class MRBatchPublisher implements MRBatchingPublisher */ public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress ) { - if ( maxAgeMs < kMinMaxAgeMs ) + if ( maxAgeMs < MIN_MAX_AGE_MS) { - fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs ); - maxAgeMs = kMinMaxAgeMs; + fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS); + maxAgeMs = MIN_MAX_AGE_MS; } try { @@ -279,7 +279,7 @@ public class MRBatchPublisher implements MRBatchingPublisher /** * Called to queue a message. - * @param m + * @param msgs * @throws IOException */ public void queue ( Collection<message> msgs ) throws IOException @@ -364,7 +364,7 @@ public class MRBatchPublisher implements MRBatchingPublisher fLog.warn ( "Send failed, rebuilding send queue." ); // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis (); + fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis (); // the send failed. reconstruct the pending queue fWriteLock.lock (); @@ -406,7 +406,7 @@ public class MRBatchPublisher implements MRBatchingPublisher private final WriteLock fWriteLock; private final ReadLock fReadLock; private long fDontSendUntilMs; - private static final long sfWaitAfterError = 1000; + private static final long SF_WAIT_AFTER_ERROR = 1000; } // this is static so that it's clearly not using any mutable member data outside of a lock diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java index ed23918..6a13910 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRClientVersionInfo.java @@ -38,21 +38,15 @@ public class MRClientVersionInfo private static final Properties props = new Properties(); private static final String version; - static - { + static { String use = null; - try - { - final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" ); - if ( is != null ) - { - props.load ( is ); + try (InputStream is = MRClientVersionInfo.class.getResourceAsStream("/MRClientVersion.properties" )) { + if (is != null) { + props.load(is); use = props.getProperty ( "MRClientVersion", null ); } - } - catch ( IOException e ) - { - logger.error("exception: ", e); + } catch ( IOException e ) { + logger.error("exception: ", e); } version = use; } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java index ec17610..dbf6b4d 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConstants.java @@ -32,9 +32,9 @@ import org.apache.http.HttpHost; class MRConstants { private static final String PROTOCOL = "http"; - public static final String context = "/"; - public static final String kBasePath = "events/"; - public static final int kStdMRServicePort = 8080; + public static final String CONTEXT = "/"; + public static final String BASE_PATH = "events/"; + public static final int STD_MR_SERVICE_PORT = 8080; public static String escape ( String s ) { @@ -53,8 +53,8 @@ class MRConstants final String cleanTopic = escape ( rawTopic ); final StringBuffer url = new StringBuffer(). - append ( MRConstants.context ). - append ( MRConstants.kBasePath ). + append ( MRConstants.CONTEXT). + append ( MRConstants.BASE_PATH). append ( cleanTopic ); return url.toString (); } @@ -68,8 +68,8 @@ class MRConstants url.append( PROTOCOL + "://" ); } url.append(host); - url.append ( MRConstants.context ); - url.append ( MRConstants.kBasePath ); + url.append ( MRConstants.CONTEXT); + url.append ( MRConstants.BASE_PATH); url.append ( cleanTopic ); return url.toString (); } @@ -86,8 +86,8 @@ class MRConstants url.append( PROTOCOL + "://" ); } url.append(host); - url.append ( MRConstants.context ); - url.append ( MRConstants.kBasePath ); + url.append ( MRConstants.CONTEXT); + url.append ( MRConstants.BASE_PATH); url.append ( cleanTopic ); if(parttion!=null && !parttion.equalsIgnoreCase("")) url.append("?partitionKey=").append(parttion); @@ -97,7 +97,7 @@ class MRConstants { final String cleanConsumerGroup = escape ( rawConsumerGroup ); final String cleanConsumerId = escape ( rawConsumerId ); - return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; + return MRConstants.CONTEXT + MRConstants.BASE_PATH + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; } /** @@ -122,7 +122,7 @@ class MRConstants * Return an HttpHost from an input string. Input string has * host[:port] as format. If the port section is not provided, the default port is used. * - * @param hosts + * @param host * @return a list of hosts */ public static HttpHost hostForString ( String host ) @@ -130,7 +130,7 @@ class MRConstants if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); String hostPart = host; - int port = kStdMRServicePort; + int port = STD_MR_SERVICE_PORT; final int colon = host.indexOf ( ':' ); if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); @@ -169,8 +169,8 @@ class MRConstants } url.append(host); - url.append(context); - url.append(kBasePath); + url.append(CONTEXT); + url.append(BASE_PATH); url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId); return url.toString(); diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java index 0b06f77..6c67313 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRConsumerImpl.java @@ -28,19 +28,13 @@ import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.UnsupportedEncodingException; + +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.http.HttpException; import org.apache.http.HttpStatus; @@ -55,15 +49,27 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private Logger log = LoggerFactory.getLogger(this.getClass().getName()); - public static final String routerFilePath = null; + public static final String ROUTER_FILE_PATH = null; public String protocolFlag = ProtocolTypeConstants.DME2.getValue(); public String consumerFilePath; + private static final String JSON_RESULT = "result"; + private static final String PROPS_PROTOCOL = "Protocol"; + + private static final String EXECPTION_MESSAGE = "exception: "; private static final String SUCCESS_MESSAGE = "Success"; private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L; private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L; + private static final String URL_PARAM_ROUTE_OFFER = "routeoffer"; + private static final String URL_PARAM_PARTNER = "partner"; + private static final String URL_PARAM_ENV_CONTEXT = "envContext"; + private static final String URL_PARAM_VERSION = "version"; + private static final String URL_PARAM_FILTER = "filter"; + private static final String URL_PARAM_LIMIT = "limit"; + private static final String URL_PARAM_TIMEOUT = "timeout"; + private final String fTopic; private final String fGroup; private final String fId; @@ -186,124 +192,67 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { public Iterable<String> fetch(int timeoutMs, int limit) throws Exception { final LinkedList<String> msgs = new LinkedList<>(); - try { - if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { - dmeConfigure(timeoutMs, limit); - try { - String reply = sender.sendAndWait(timeoutMs + 10000L); - final JSONObject o = getResponseDataInJson(reply); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = get(urlPath, username, password, protocolFlag); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } - } - - if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), - timeoutMs, limit); - - try { - final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } + ProtocolTypeConstants protocolFlagEnum = null; + for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) { + if (type.getValue().equalsIgnoreCase(protocolFlag)) { + protocolFlagEnum = type; } + } + if (protocolFlagEnum == null) { + return msgs; + } - if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); - - try { - final JSONObject o = getNoAuth(urlPath); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - log.error("exception: ", e); - } catch (HttpException e) { - throw new IOException(e); - } + try { + switch (protocolFlagEnum) { + case DME2: + dmeConfigure(timeoutMs, limit); + String reply = sender.sendAndWait(timeoutMs + 10000L); + readJsonData(msgs, getResponseDataInJson(reply)); + break; + case AAF_AUTH: + String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + final JSONObject o = get(urlAuthPath, username, password, protocolFlag); + readJsonData(msgs, o); + break; + case AUTH_KEY: + final String urlKeyPath = createUrlPath( + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), + timeoutMs, limit); + final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag); + readJsonData(msgs, authObject); + break; + case HTTPNOAUTH: + final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); + readJsonData(msgs, getNoAuth(urlNoAuthPath)); + break; } } catch (JSONException e) { // unexpected response reportProblemWithResponse(); - log.error("exception: ", e); + log.error(EXECPTION_MESSAGE, e); } catch (HttpException e) { throw new IOException(e); - } catch (Exception e) { - throw e; } return msgs; } + private void readJsonData(LinkedList<String> msgs, JSONObject o) { + if (o != null) { + final JSONArray a = o.getJSONArray(JSON_RESULT); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + if (a.get(i) instanceof String) + msgs.add(a.getString(i)); + else + msgs.add(a.getJSONObject(i).toString()); + } + } + } + } + @Override public MRConsumerResponse fetchWithReturnConsumerResponse() { // fetch with the timeout and limit set in constructor @@ -324,82 +273,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(reply, mrConsumerResponse); } if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath( - MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")), + MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic, - fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit); + fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit); String response = getNoAuthResponse(urlPath, username, password, protocolFlag); final JSONObject o = getResponseDataInJsonWithResponseReturned(response); - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - - if (a != null) { - for (int i = 0; i < a.length(); i++) { - if (a.get(i) instanceof String) - msgs.add(a.getString(i)); - else - msgs.add(a.getJSONObject(i).toString()); - } - } - } + readJsonData(msgs, o); createMRConsumerResponse(response, mrConsumerResponse); } @@ -418,7 +323,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } catch (Exception e) { mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); mrConsumerResponse.setResponseMessage(e.getMessage()); - log.error("exception: ", e); + log.error(EXECPTION_MESSAGE, e); } mrConsumerResponse.setActualMessages(msgs); return mrConsumerResponse; @@ -460,7 +365,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); + jsonObject.put(JSON_RESULT, jsonArray); } else { jsonObject = new JSONObject(jsonTokener); } @@ -484,7 +389,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if ('[' == firstChar) { JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject = new JSONObject(); - jsonObject.put("result", jsonArray); + jsonObject.put(JSON_RESULT, jsonArray); } else if ('{' == firstChar) { return null; } else if ('<' == firstChar) { @@ -506,7 +411,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { String partner = props.getProperty("Partner"); String routeOffer = props.getProperty("routeOffer"); String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId; - String protocol = props.getProperty("Protocol"); + String protocol = props.getProperty(PROPS_PROTOCOL); String methodType = props.getProperty("MethodType"); String dmeuser = props.getProperty("username"); String dmepassword = props.getProperty("password"); @@ -519,25 +424,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { */ String preferredRouteKey = readRoute("preferredRouteKey"); - + StringBuilder contextUrl = new StringBuilder(); if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner - + "&routeoffer=" + preferredRouteKey; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_PARTNER).append("=").append(partner).append("&") + .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey); } else if (partner != null && !partner.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_PARTNER).append("=").append(partner); } else if (routeOffer != null && !routeOffer.isEmpty()) { - url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer=" - + routeOffer; + contextUrl.append(protocol).append("://").append(serviceName).append("?") + .append(URL_PARAM_VERSION).append("=").append(version).append("&") + .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&") + .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer); } - if (timeoutMs != -1) - url = url + "&timeout=" + timeoutMs; - if (limit != -1) - url = url + "&limit=" + limit; + if (timeoutMs != -1) { + contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs); + } + if (limit != -1) { + contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit); + } // Add filter to DME2 Url - if (fFilter != null && fFilter.length() > 0) - url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8"); + if (fFilter != null && fFilter.length() > 0) { + contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8")); + } + + url = contextUrl.toString(); DMETimeOuts = new HashMap<>(); DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); @@ -642,8 +560,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } private String readRoute(String routeKey) { - try { - MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath))); + try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) { + MRClientFactory.prop.load(input); } catch (Exception ex) { log.error("Reply Router Error " + ex); } @@ -665,7 +583,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } public static String getRouterFilePath() { - return routerFilePath; + return ROUTER_FILE_PATH; } public static void setRouterFilePath(String routerFilePath) { diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java index 04b26d5..c1e2d12 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRMetaClient.java @@ -42,6 +42,12 @@ import org.onap.dmaap.mr.client.MRTopicManager; public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager { + private static final String BASE_URI_TOPIC = "/topics"; + private static final String BASE_URI_APIKEY = "/apiKeys"; + + private static final String PARAM_DESCRIPTION = "description"; + private static final String PARAM_EMAIL = "email"; + private static final Logger logger = LoggerFactory.getLogger(MRMetaClient.class); public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException { @@ -54,7 +60,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden final TreeSet<String> set = new TreeSet<> (); try { - final JSONObject topicSet = get ( "/topics" ); + final JSONObject topicSet = get ( BASE_URI_TOPIC ); final JSONArray a = topicSet.getJSONArray ( "topics" ); for ( int i=0; i<a.length (); i++ ) { @@ -83,7 +89,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden { try { - final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) ); + final JSONObject topicData = get ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) ); return new TopicInfo () { @Override @@ -95,7 +101,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden @Override public String getDescription () { - return topicData.optString ( "description", null ); + return topicData.optString ( PARAM_DESCRIPTION, null ); } @Override @@ -139,13 +145,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden o.put ( "topicDescription", topicDescription ); o.put ( "partitionCount", partitionCount ); o.put ( "replicationCount", replicationCount ); - post ( "/topics/create", o, false ); + post ( BASE_URI_TOPIC + "/create", o, false ); } @Override public void deleteTopic ( String topic ) throws HttpException, IOException { - delete ( "/topics/" + MRConstants.escape ( topic ) ); + delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) ); } @Override @@ -163,13 +169,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden @Override public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException { - put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() ); + put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() ); } @Override public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException { - delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) ); + delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) ); } @Override @@ -187,13 +193,13 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden @Override public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException { - put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() ); + put ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() ); } @Override public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException { - delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) ); + delete ( BASE_URI_TOPIC + "/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) ); } @Override @@ -202,9 +208,9 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden try { final JSONObject o = new JSONObject (); - o.put ( "email", email ); - o.put ( "description", description ); - final JSONObject reply = post ( "/apiKeys/create", o, true ); + o.put ( PARAM_EMAIL, email ); + o.put ( PARAM_DESCRIPTION, description ); + final JSONObject reply = post ( BASE_URI_APIKEY + "/create", o, true ); return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) ); } catch ( JSONException e ) @@ -217,7 +223,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden @Override public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException { - final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) ); + final JSONObject keyEntry = get ( BASE_URI_APIKEY + "/" + MRConstants.escape ( apiKey ) ); if ( keyEntry == null ) { return null; @@ -231,7 +237,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden final JSONObject aux = keyEntry.optJSONObject ( "aux" ); if ( aux != null ) { - return aux.optString ( "email" ); + return aux.optString ( PARAM_EMAIL ); } return null; } @@ -242,7 +248,7 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden final JSONObject aux = keyEntry.optJSONObject ( "aux" ); if ( aux != null ) { - return aux.optString ( "description" ); + return aux.optString ( PARAM_DESCRIPTION ); } return null; } @@ -253,14 +259,14 @@ public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIden public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException { final JSONObject o = new JSONObject (); - if ( email != null ) o.put ( "email", email ); - if ( description != null ) o.put ( "description", description ); - patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o ); + if ( email != null ) o.put ( PARAM_EMAIL, email ); + if ( description != null ) o.put ( PARAM_DESCRIPTION, description ); + patch ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ), o ); } @Override public void deleteCurrentApiKey () throws HttpException, IOException { - delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) ); + delete ( BASE_URI_APIKEY + "/" + MRConstants.escape ( getCurrentApiKey() ) ); } } diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java index 9b969a6..bd140cd 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + private static final String PROPS_PROTOCOL = "Protocol"; + private static final String PROPS_PARTITION = "partition"; + private static final String PROPS_CONTENT_TYPE = "contenttype"; + + private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip"; + private static final String CONTENT_TYPE_CAMBRIA = "application/cambria"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String JSON_STATUS = "status"; + public static class Builder { - public Builder() { - } public Builder againstUrls(Collection<String> baseUrls) { fUrls = baseUrls; @@ -278,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return jsonArray; } + private void logTime(long startMs, String dmeResponse) { + if (getLog().isInfoEnabled()) { + getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse); + } + } + private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. @@ -291,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); } - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); os.close(); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } os.close(); - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -339,15 +354,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info(String .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp)); sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse; - getLog().info(logLine); + logTime(startMs, dmeResponse); fPending.clear(); return true; } @@ -362,11 +376,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -380,11 +393,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -397,11 +409,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -424,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); OutputStream os = null; try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -469,7 +480,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP try { DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp); sender.setPayload(os.toString()); @@ -524,8 +535,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return pubResponse; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result); fPending.clear(); return pubResponse; } @@ -618,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP mrPubResponse.setResponseMessage("Please verify the Producer properties"); } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if (jObject.has("message") && jObject.has("status")) { + if (jObject.has("message") && jObject.has(JSON_STATUS)) { String message = jObject.getString("message"); if (null != message) { mrPubResponse.setResponseMessage(message); } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); } else { mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); mrPubResponse.setResponseMessage(reply); @@ -730,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP routeOffer = props.getProperty("routeOffer"); subContextPath = props.getProperty("SubContextPath") + fTopic; - protocol = props.getProperty("Protocol"); + protocol = props.getProperty(PROPS_PROTOCOL); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); - contentType = props.getProperty("contenttype"); + contentType = props.getProperty(PROPS_CONTENT_TYPE); handlers = props.getProperty("sessionstickinessrequired"); routerFilePath = props.getProperty("DME2preferredRouterFilePath"); @@ -744,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); + String partitionKey = props.getProperty(PROPS_PARTITION); if (partner != null && !partner.isEmpty()) { url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" diff --git a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java index 5bb7dad..76abb7e 100644 --- a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java +++ b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExampleConsumer.java @@ -57,7 +57,7 @@ public class SimpleExampleConsumer { logger.debug("Message Received: " + msg); } // Header for DME2 Call. - MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; + MultivaluedMap<String, Object> headersMap = MRClientFactory.getHTTPHeadersMap(); for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) { key = entry.getKey(); logger.debug("Header Key " + key); diff --git a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java index de0d461..b5d83bb 100644 --- a/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java +++ b/src/main/java/org/onap/dmaap/mr/dme/client/SimpleExamplePublisher.java @@ -93,7 +93,7 @@ public class SimpleExamplePublisher { }*/ if (transport.equalsIgnoreCase("HTTP")) { - MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap; + MultivaluedMap<String, Object> headersMap = MRClientFactory.getHTTPHeadersMap(); for (String key : headersMap.keySet()) { System.out.println("Header Key " + key); System.out.println("Header Value " + headersMap.get(key)); diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java index 2e514b0..f7341ec 100644 --- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java +++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java @@ -45,6 +45,9 @@ public class SimpleExampleConsumerWithReturnResponse { long count = 0; long nextReport = 5000; + // remove while true and limite execution time in seconds + int timeMax = 86400; // one day + long endDate = System.currentTimeMillis() + timeMax*1000; final long startMs = System.currentTimeMillis (); @@ -60,7 +63,7 @@ public class SimpleExampleConsumerWithReturnResponse { routeReader= new FileReader(new File (routeFilePath)); props= new Properties(); final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" ); - while ( true ) + while ( System.currentTimeMillis() < endDate ) { MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse(); System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode()); |