diff options
author | sliard <samuel.liard@gmail.com> | 2021-04-07 17:28:50 +0200 |
---|---|---|
committer | sliard <samuel.liard@gmail.com> | 2021-04-12 16:02:59 +0200 |
commit | 78ebc9a64fac6231e3e594200b9335a4c6372ed1 (patch) | |
tree | 4e48966a35f6cceaac4c00b705e640830e34c634 /src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java | |
parent | 8a00d4324ea7621df1a1febb8936df2807cbc443 (diff) |
First sonar issues review
Issue-ID: DMAAP-1585
Change-Id: I5dc4d3d4cab75f5fabcc8d4f351eac4d3ea50d17
Signed-off-by: sliard <samuel.liard@gmail.com>
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java | 86 |
1 files changed, 48 insertions, 38 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java index 9b969a6..bd140cd 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + private static final String PROPS_PROTOCOL = "Protocol"; + private static final String PROPS_PARTITION = "partition"; + private static final String PROPS_CONTENT_TYPE = "contenttype"; + + private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip"; + private static final String CONTENT_TYPE_CAMBRIA = "application/cambria"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String JSON_STATUS = "status"; + public static class Builder { - public Builder() { - } public Builder againstUrls(Collection<String> baseUrls) { fUrls = baseUrls; @@ -278,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return jsonArray; } + private void logTime(long startMs, String dmeResponse) { + if (getLog().isInfoEnabled()) { + getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse); + } + } + private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. @@ -291,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); } - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); os.close(); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } os.close(); - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -339,15 +354,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info(String .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp)); sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse; - getLog().info(logLine); + logTime(startMs, dmeResponse); fPending.clear(); return true; } @@ -362,11 +376,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -380,11 +393,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -397,11 +409,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -424,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); OutputStream os = null; try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -469,7 +480,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP try { DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp); sender.setPayload(os.toString()); @@ -524,8 +535,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return pubResponse; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result); fPending.clear(); return pubResponse; } @@ -618,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP mrPubResponse.setResponseMessage("Please verify the Producer properties"); } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if (jObject.has("message") && jObject.has("status")) { + if (jObject.has("message") && jObject.has(JSON_STATUS)) { String message = jObject.getString("message"); if (null != message) { mrPubResponse.setResponseMessage(message); } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); } else { mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); mrPubResponse.setResponseMessage(reply); @@ -730,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP routeOffer = props.getProperty("routeOffer"); subContextPath = props.getProperty("SubContextPath") + fTopic; - protocol = props.getProperty("Protocol"); + protocol = props.getProperty(PROPS_PROTOCOL); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); - contentType = props.getProperty("contenttype"); + contentType = props.getProperty(PROPS_CONTENT_TYPE); handlers = props.getProperty("sessionstickinessrequired"); routerFilePath = props.getProperty("DME2preferredRouterFilePath"); @@ -744,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); + String partitionKey = props.getProperty(PROPS_PARTITION); if (partner != null && !partner.isEmpty()) { url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" |