summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/metrics/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics/publisher')
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java4
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java2
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java14
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java10
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java29
5 files changed, 18 insertions, 41 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
index 0993aa6..4b219b1 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
@@ -21,11 +21,11 @@
*******************************************************************************/
package com.att.dmf.mr.metrics.publisher;
-//import org.slf4j.Logger;
+
//
import com.att.eelf.configuration.EELFLogger;
-//import com.att.eelf.configuration.EELFManager;
+
/**
*
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
index 1510c32..46dfa99 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
@@ -95,7 +95,7 @@ public class CambriaPublisherUtility
*/
public static List<HttpHost> createHostsList(Collection<String> hosts)
{
- final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
for ( String host : hosts )
{
if ( host.length () == 0 ) continue;
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
index d02438f..9158c96 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
@@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return an identity manager
*/
- /*
- * public static CambriaIdentityManager createIdentityManager (
- * Collection<String> hostSet, String apiKey, String apiSecret ) { final
- * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
- * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
- */
+
/**
* Create a topic manager for working with topics.
@@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory {
* Your API secret
* @return a topic manager
*/
- /*
- * public static CambriaTopicManager createTopicManager ( Collection<String>
- * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
- * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
- * apiSecret ); return tmi; }
- */
+
/**
* Inject a consumer. Used to support unit tests.
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
index 08b2fd1..ebdf3ed 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
@@ -31,7 +31,7 @@ import org.json.JSONArray;
import org.json.JSONException;
import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
+
//import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
{
- /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature,
- CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/
+
+
super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
- //fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+
fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
//( this.getClass().getName () );
}
@@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
{
fLog = log;
- //replaceLogger ( log );
+
}
public EELFLogger getLog ()
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
index d8d8799..dee9e57 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -186,7 +186,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
public void close() {
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.size() > 0) {
+ if (remains.isEmpty()) {
getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
@@ -251,7 +251,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
*/
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.size() > 0) {
+ if (fPending.isEmpty()) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -273,7 +273,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
- if (fPending.size() < 1) {
+ if (fPending.isEmpty()) {
return true;
}
@@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
// code from REST Client Starts
- // final String serverCalculatedSignature = sha1HmacSigner.sign
- // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+
+
Client client = ClientBuilder.newClient();
String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
@@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
Response response = target.request().post(data);
- // header("X-CambriaAuth",
- // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
- // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
- // post(Entity.json(baseStream.toByteArray()));
-
+
getLog().info("Response received :: " + response.getStatus());
getLog().info("Response received :: " + response.toString());
// code from REST Client Ends
- /*
- * final JSONObject result = post ( url, contentType,
- * baseStream.toByteArray(), true ); final String logLine =
- * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
- * result.toString (); getLog().info ( logLine );
- */
+
fPending.clear();
return true;
} catch (IllegalArgumentException x) {
getLog().warn(x.getMessage(), x);
}
- /*
- * catch ( HttpObjectNotFoundException x ) { getLog().warn (
- * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
- * x.getMessage(), x ); }
- */
+
catch (IOException x) {
getLog().warn(x.getMessage(), x);
}