diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics/publisher')
5 files changed, 18 insertions, 41 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java index 0993aa6..4b219b1 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java @@ -21,11 +21,11 @@ *******************************************************************************/ package com.att.dmf.mr.metrics.publisher; -//import org.slf4j.Logger; + // import com.att.eelf.configuration.EELFLogger; -//import com.att.eelf.configuration.EELFManager; + /** * diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java index 1510c32..46dfa99 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java @@ -95,7 +95,7 @@ public class CambriaPublisherUtility */ public static List<HttpHost> createHostsList(Collection<String> hosts) { - final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); + final ArrayList<HttpHost> convertedHosts = new ArrayList<>(); for ( String host : hosts ) { if ( host.length () == 0 ) continue; diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java index d02438f..9158c96 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java @@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return an identity manager */ - /* - * public static CambriaIdentityManager createIdentityManager ( - * Collection<String> hostSet, String apiKey, String apiSecret ) { final - * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet ); - * cim.setApiCredentials ( apiKey, apiSecret ); return cim; } - */ + /** * Create a topic manager for working with topics. @@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return a topic manager */ - /* - * public static CambriaTopicManager createTopicManager ( Collection<String> - * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi - * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey, - * apiSecret ); return tmi; } - */ + /** * Inject a consumer. Used to support unit tests. diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java index 08b2fd1..ebdf3ed 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java @@ -31,7 +31,7 @@ import org.json.JSONArray; import org.json.JSONException; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; + //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException { - /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, - CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/ + + super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); - //fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); //( this.getClass().getName () ); } @@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr { fLog = log; - //replaceLogger ( log ); + } public EELFLogger getLog () diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java index d8d8799..dee9e57 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java @@ -186,7 +186,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient public void close() { try { final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (remains.size() > 0) { + if (remains.isEmpty()) { getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); } @@ -251,7 +251,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient */ private synchronized boolean shouldSendNow() { boolean shouldSend = false; - if (fPending.size() > 0) { + if (fPending.isEmpty()) { final long nowMs = Clock.now(); shouldSend = (fPending.size() >= fMaxBatchSize); @@ -273,7 +273,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. - if (fPending.size() < 1) { + if (fPending.isEmpty()) { return true; } @@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient // code from REST Client Starts - // final String serverCalculatedSignature = sha1HmacSigner.sign - // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV"); + + Client client = ClientBuilder.newClient(); String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); @@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria"); Response response = target.request().post(data); - // header("X-CambriaAuth", - // "2OH46YIWa329QpEF:"+serverCalculatedSignature). - // header("X-CambriaDate", "2015-09-21T11:38:19-0700"). - // post(Entity.json(baseStream.toByteArray())); - + getLog().info("Response received :: " + response.getStatus()); getLog().info("Response received :: " + response.toString()); // code from REST Client Ends - /* - * final JSONObject result = post ( url, contentType, - * baseStream.toByteArray(), true ); final String logLine = - * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" + - * result.toString (); getLog().info ( logLine ); - */ + fPending.clear(); return true; } catch (IllegalArgumentException x) { getLog().warn(x.getMessage(), x); } - /* - * catch ( HttpObjectNotFoundException x ) { getLog().warn ( - * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn ( - * x.getMessage(), x ); } - */ + catch (IOException x) { getLog().warn(x.getMessage(), x); } |