summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-10-23 12:18:59 -0400
committersunil unnava <sunil.unnava@att.com>2018-10-23 12:22:02 -0400
commit3504265229c589ecc166e3ad4c33bb198b11e4ce (patch)
tree022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
parent8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff)
update the package name1.1.11
Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java')
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java422
1 files changed, 0 insertions, 422 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
deleted file mode 100644
index e9b1cdb..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.nio.channels.NotYetConnectedException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPOutputStream;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
-
-/**
- *
- * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
- * in batch
- *
- * @author anowarul.islam
- *
- */
-public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
- implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
- /**
- *
- * static inner class initializes with urls, topic,batchSize
- *
- * @author anowarul.islam
- *
- */
- public static class Builder {
- public Builder() {
- }
-
- /**
- * constructor initialize with url
- *
- * @param baseUrls
- * @return
- *
- */
- public Builder againstUrls(Collection<String> baseUrls) {
- fUrls = baseUrls;
- return this;
- }
-
- /**
- * constructor initializes with topics
- *
- * @param topic
- * @return
- *
- */
- public Builder onTopic(String topic) {
- fTopic = topic;
- return this;
- }
-
- /**
- * constructor initilazes with batch size and batch time
- *
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @return
- *
- */
- public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- return this;
- }
-
- /**
- * constructor initializes with compress
- *
- * @param compress
- * @return
- */
- public Builder compress(boolean compress) {
- fCompress = compress;
- return this;
- }
-
- /**
- * method returns DMaaPCambriaSimplerBatchPublisher object
- *
- * @return
- */
- public DMaaPCambriaSimplerBatchPublisher build() {
-
- try {
- return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
- } catch (MalformedURLException e) {
-
- NotYetConnectedException exception=new NotYetConnectedException();
- exception.setStackTrace(e.getStackTrace());
-
- throw exception ;
-
- }
- }
-
- private Collection<String> fUrls;
- private String fTopic;
- private int fMaxBatchSize = 100;
- private long fMaxBatchAgeMs = 1000;
- private boolean fCompress = false;
- };
-
- /**
- *
- * @param partition
- * @param msg
- */
- @Override
- public int send(String partition, String msg) {
- return send(new message(partition, msg));
- }
-
- /**
- * @param msg
- */
- @Override
- public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
- list.add(msg);
- return send(list);
- }
-
- /**
- * @param msgs
- */
- @Override
- public synchronized int send(Collection<message> msgs) {
- if (fClosed) {
- throw new IllegalStateException("The publisher was closed.");
- }
-
- for (message userMsg : msgs) {
- fPending.add(new TimestampedMessage(userMsg));
- }
- return getPendingMessageCount();
- }
-
- /**
- * getPending message count
- */
- @Override
- public synchronized int getPendingMessageCount() {
- return fPending.size();
- }
-
- /**
- *
- * @exception InterruptedException
- * @exception IOException
- */
- @Override
- public void close() {
- try {
- final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.isEmpty()) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
- }
- } catch (InterruptedException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- } catch (IOException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- }
- }
-
- /**
- * @param time
- * @param unit
- */
- @Override
- public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
- synchronized (this) {
- fClosed = true;
-
- // stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- fExec.shutdown();
- }
-
- final long now = Clock.now();
- final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
- final long timeoutAtMs = now + waitInMs;
-
- while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
- send(true);
- Thread.sleep(250);
- }
- // synchronizing the current object
- synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
- fPending.drainTo(result);
- return result;
- }
- }
-
- /**
- * Possibly send a batch to the cambria server. This is called by the
- * background thread and the close() method
- *
- * @param force
- */
- private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
- }
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean shouldSendNow() {
- boolean shouldSend = false;
- if (fPending.isEmpty()) {
- final long nowMs = Clock.now();
-
- shouldSend = (fPending.size() >= fMaxBatchSize);
- if (!shouldSend) {
- final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
- }
- return shouldSend;
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean sendBatch() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.isEmpty()) {
- return true;
- }
-
- final long nowMs = Clock.now();
- final String url = CambriaPublisherUtility.makeUrl(fTopic);
-
- getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
-
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- OutputStream os = baseStream;
- if (fCompress) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
-
- final long startMs = Clock.now();
-
- // code from REST Client Starts
-
-
-
-
- Client client = ClientBuilder.newClient();
- String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
- if (null==metricTopicname) {
-
- metricTopicname="msgrtr.apinode.metrics.dmaap";
- }
- WebTarget target = client
- .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
- target = target.path("/events/" + fTopic);
- getLog().info("url : " + target.getUri().toString());
- // API Key
-
- Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
-
- Response response = target.request().post(data);
-
- getLog().info("Response received :: " + response.getStatus());
- getLog().info("Response received :: " + response.toString());
-
- // code from REST Client Ends
-
-
- fPending.clear();
- return true;
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- }
-
- catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- }
- return false;
- }
-
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxBatchAgeMs;
- private final boolean fCompress;
- private boolean fClosed;
-
- private final LinkedBlockingQueue<TimestampedMessage> fPending;
- private long fDontSendUntilMs;
- private final ScheduledThreadPoolExecutor fExec;
-
- private static final long sfWaitAfterError = 1000;
-
- /**
- *
- * @param hosts
- * @param topic
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @param compress
- */
- private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
- long maxBatchAgeMs, boolean compress) throws MalformedURLException {
-
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
-
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
- fDontSendUntilMs = 0;
-
- fExec = new ScheduledThreadPoolExecutor(1);
- fExec.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- send(false);
- }
- }, 100, 50, TimeUnit.MILLISECONDS);
- }
-
- /**
- *
- *
- * @author anowarul.islam
- *
- */
- private static class TimestampedMessage extends message {
- /**
- * to store timestamp value
- */
- public final long timestamp;
-
- /**
- * constructor initialize with message
- *
- * @param m
- *
- */
- public TimestampedMessage(message m) {
- super(m);
- timestamp = Clock.now();
- }
- }
-
-}