summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java')
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java429
1 files changed, 429 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
new file mode 100644
index 0000000..d8d8799
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -0,0 +1,429 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
+
+/**
+ *
+ * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
+ * in batch
+ *
+ * @author anowarul.islam
+ *
+ */
+public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
+ implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
+ /**
+ *
+ * static inner class initializes with urls, topic,batchSize
+ *
+ * @author anowarul.islam
+ *
+ */
+ public static class Builder {
+ public Builder() {
+ }
+
+ /**
+ * constructor initialize with url
+ *
+ * @param baseUrls
+ * @return
+ *
+ */
+ public Builder againstUrls(Collection<String> baseUrls) {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ /**
+ * constructor initializes with topics
+ *
+ * @param topic
+ * @return
+ *
+ */
+ public Builder onTopic(String topic) {
+ fTopic = topic;
+ return this;
+ }
+
+ /**
+ * constructor initilazes with batch size and batch time
+ *
+ * @param maxBatchSize
+ * @param maxBatchAgeMs
+ * @return
+ *
+ */
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ /**
+ * constructor initializes with compress
+ *
+ * @param compress
+ * @return
+ */
+ public Builder compress(boolean compress) {
+ fCompress = compress;
+ return this;
+ }
+
+ /**
+ * method returns DMaaPCambriaSimplerBatchPublisher object
+ *
+ * @return
+ */
+ public DMaaPCambriaSimplerBatchPublisher build() {
+
+ try {
+ return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Collection<String> fUrls;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ };
+
+ /**
+ *
+ * @param partition
+ * @param msg
+ */
+ @Override
+ public int send(String partition, String msg) {
+ return send(new message(partition, msg));
+ }
+
+ /**
+ * @param msg
+ */
+ @Override
+ public int send(message msg) {
+ final LinkedList<message> list = new LinkedList<message>();
+ list.add(msg);
+ return send(list);
+ }
+
+ /**
+ * @param msgs
+ */
+ @Override
+ public synchronized int send(Collection<message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
+ }
+
+ for (message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
+ }
+ return getPendingMessageCount();
+ }
+
+ /**
+ * getPending message count
+ */
+ @Override
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
+ }
+
+ /**
+ *
+ * @exception InterruptedException
+ * @exception IOException
+ */
+ @Override
+ public void close() {
+ try {
+ final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.size() > 0) {
+ getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+ }
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param time
+ * @param unit
+ */
+ @Override
+ public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
+ }
+
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ final long timeoutAtMs = now + waitInMs;
+
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
+ }
+ // synchronizing the current object
+ synchronized (this) {
+ final LinkedList<message> result = new LinkedList<message>();
+ fPending.drainTo(result);
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the cambria server. This is called by the
+ * background thread and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send(boolean force) {
+ if (force || shouldSendNow()) {
+ if (!sendBatch()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + Clock.now();
+ }
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ private synchronized boolean shouldSendNow() {
+ boolean shouldSend = false;
+ if (fPending.size() > 0) {
+ final long nowMs = Clock.now();
+
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ /**
+ *
+ * @return
+ */
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
+ return true;
+ }
+
+ final long nowMs = Clock.now();
+ final String url = CambriaPublisherUtility.makeUrl(fTopic);
+
+ getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ OutputStream os = baseStream;
+ if (fCompress) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+
+ final long startMs = Clock.now();
+
+ // code from REST Client Starts
+
+ // final String serverCalculatedSignature = sha1HmacSigner.sign
+ // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+
+ Client client = ClientBuilder.newClient();
+ String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
+ if (null==metricTopicname) {
+
+ metricTopicname="msgrtr.apinode.metrics.dmaap";
+ }
+ WebTarget target = client
+ .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
+ target = target.path("/events/" + fTopic);
+ getLog().info("url : " + target.getUri().toString());
+ // API Key
+
+ Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
+
+ Response response = target.request().post(data);
+ // header("X-CambriaAuth",
+ // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
+ // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
+ // post(Entity.json(baseStream.toByteArray()));
+
+ getLog().info("Response received :: " + response.getStatus());
+ getLog().info("Response received :: " + response.toString());
+
+ // code from REST Client Ends
+
+ /*
+ * final JSONObject result = post ( url, contentType,
+ * baseStream.toByteArray(), true ); final String logLine =
+ * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
+ * result.toString (); getLog().info ( logLine );
+ */
+ fPending.clear();
+ return true;
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ /*
+ * catch ( HttpObjectNotFoundException x ) { getLog().warn (
+ * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
+ * x.getMessage(), x ); }
+ */
+ catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private boolean fClosed;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private static final long sfWaitAfterError = 1000;
+
+ /**
+ *
+ * @param hosts
+ * @param topic
+ * @param maxBatchSize
+ * @param maxBatchAgeMs
+ * @param compress
+ */
+ private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
+ long maxBatchAgeMs, boolean compress) throws MalformedURLException {
+
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fDontSendUntilMs = 0;
+
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ send(false);
+ }
+ }, 100, 50, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ *
+ *
+ * @author anowarul.islam
+ *
+ */
+ private static class TimestampedMessage extends message {
+ /**
+ * to store timestamp value
+ */
+ public final long timestamp;
+
+ /**
+ * constructor initialize with message
+ *
+ * @param m
+ *
+ */
+ public TimestampedMessage(message m) {
+ super(m);
+ timestamp = Clock.now();
+ }
+ }
+
+}