aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r--src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java86
1 files changed, 48 insertions, 38 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
index 9b969a6..bd140cd 100644
--- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java
@@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+ private static final String PROPS_PROTOCOL = "Protocol";
+ private static final String PROPS_PARTITION = "partition";
+ private static final String PROPS_CONTENT_TYPE = "contenttype";
+
+ private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
+ private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
+ private static final String CONTENT_TYPE_JSON = "application/json";
+ private static final String CONTENT_TYPE_TEXT = "text/plain";
+
+ private static final String JSON_STATUS = "status";
+
public static class Builder {
- public Builder() {
- }
public Builder againstUrls(Collection<String> baseUrls) {
fUrls = baseUrls;
@@ -278,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
return jsonArray;
}
+ private void logTime(long startMs, String dmeResponse) {
+ if (getLog().isInfoEnabled()) {
+ getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
+ }
+ }
+
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
@@ -291,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
host = this.fHostSelector.selectBaseHost();
}
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
os.close();
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
os.close();
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
@@ -339,15 +354,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
DME2Configue();
- Thread.sleep(5);
+ this.wait(5);
getLog().info(String
.format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
nowMs - fPending.peek().timestamp));
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
- getLog().info(logLine);
+ logTime(startMs, dmeResponse);
fPending.clear();
return true;
}
@@ -362,11 +376,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -380,11 +393,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -397,11 +409,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
@@ -424,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
host = this.fHostSelector.selectBaseHost();
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
OutputStream os = null;
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
@@ -469,7 +480,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
try {
DME2Configue();
- Thread.sleep(5);
+ this.wait(5);
getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
nowMs - fPending.peek().timestamp);
sender.setPayload(os.toString());
@@ -524,8 +535,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
return pubResponse;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result);
fPending.clear();
return pubResponse;
}
@@ -618,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
mrPubResponse.setResponseMessage("Please verify the Producer properties");
} else if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
- if (jObject.has("message") && jObject.has("status")) {
+ if (jObject.has("message") && jObject.has(JSON_STATUS)) {
String message = jObject.getString("message");
if (null != message) {
mrPubResponse.setResponseMessage(message);
}
- mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
} else {
mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
mrPubResponse.setResponseMessage(reply);
@@ -730,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
routeOffer = props.getProperty("routeOffer");
subContextPath = props.getProperty("SubContextPath") + fTopic;
- protocol = props.getProperty("Protocol");
+ protocol = props.getProperty(PROPS_PROTOCOL);
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
dmepassword = props.getProperty("password");
- contentType = props.getProperty("contenttype");
+ contentType = props.getProperty(PROPS_CONTENT_TYPE);
handlers = props.getProperty("sessionstickinessrequired");
routerFilePath = props.getProperty("DME2preferredRouterFilePath");
@@ -744,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
* routeOffer value for auto failover within a cluster
*/
- String partitionKey = props.getProperty("partition");
+ String partitionKey = props.getProperty(PROPS_PARTITION);
if (partner != null && !partner.isEmpty()) {
url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="