summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
diff options
context:
space:
mode:
authorsunil unnava <su622b@att.com>2018-08-14 09:34:46 -0400
committersunil unnava <su622b@att.com>2018-08-14 09:39:23 -0400
commitb32effcaf5684d5e2f338a4537b71a2375c534e5 (patch)
treee1b80407f414509ffcc766b987ec6a95f7254b4e /src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
parent0823cb186012c8e6b7de3d979dfabb9f838da7c2 (diff)
update the testcases after the kafka 11 changes
Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java')
-rw-r--r--src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java430
1 files changed, 0 insertions, 430 deletions
diff --git a/src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
deleted file mode 100644
index d7e6816..0000000
--- a/src/main/java/com/att/nsa/cambria/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.metrics.publisher.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPOutputStream;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.metrics.publisher.CambriaPublisherUtility;
-
-/**
- *
- * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
- * in batch
- *
- * @author author
- *
- */
-public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
- implements com.att.nsa.cambria.metrics.publisher.CambriaBatchingPublisher {
- /**
- *
- * static inner class initializes with urls, topic,batchSize
- *
- * @author author
- *
- */
- public static class Builder {
- public Builder() {
- }
-
- /**
- * constructor initialize with url
- *
- * @param baseUrls
- * @return
- *
- */
- public Builder againstUrls(Collection<String> baseUrls) {
- fUrls = baseUrls;
- return this;
- }
-
- /**
- * constructor initializes with topics
- *
- * @param topic
- * @return
- *
- */
- public Builder onTopic(String topic) {
- fTopic = topic;
- return this;
- }
-
- /**
- * constructor initilazes with batch size and batch time
- *
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @return
- *
- */
- public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- return this;
- }
-
- /**
- * constructor initializes with compress
- *
- * @param compress
- * @return
- */
- public Builder compress(boolean compress) {
- fCompress = compress;
- return this;
- }
-
- /**
- * method returns DMaaPCambriaSimplerBatchPublisher object
- *
- * @return
- */
- public DMaaPCambriaSimplerBatchPublisher build() {
- try {
- return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- }
-
- private Collection<String> fUrls;
- private String fTopic;
- private int fMaxBatchSize = 100;
- private long fMaxBatchAgeMs = 1000;
- private boolean fCompress = false;
- };
-
- /**
- *
- * @param partition
- * @param msg
- */
- @Override
- public int send(String partition, String msg) {
- return send(new message(partition, msg));
- }
-
- /**
- * @param msg
- */
- @Override
- public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
- list.add(msg);
- return send(list);
- }
-
- /**
- * @param msgs
- */
- @Override
- public synchronized int send(Collection<message> msgs) {
- if (fClosed) {
- throw new IllegalStateException("The publisher was closed.");
- }
-
- for (message userMsg : msgs) {
- fPending.add(new TimestampedMessage(userMsg));
- }
- return getPendingMessageCount();
- }
-
- /**
- * getPending message count
- */
- @Override
- public synchronized int getPendingMessageCount() {
- return fPending.size();
- }
-
- /**
- *
- * @exception InterruptedException
- * @exception IOException
- */
- @Override
- public void close() {
- try {
- final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.size() > 0) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
- }
- } catch (InterruptedException e) {
- getLog().info(" Interruption Exception is caught here : " + e.getMessage());
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- }
- }
-
- /**
- * @param time
- * @param unit
- */
- @Override
- public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
- synchronized (this) {
- fClosed = true;
-
- // stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- fExec.shutdown();
- }
-
- final long now = Clock.now();
- final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
- final long timeoutAtMs = now + waitInMs;
-
- while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
- send(true);
- Thread.sleep(250);
- }
- // synchronizing the current object
- synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
- fPending.drainTo(result);
- return result;
- }
- }
-
- /**
- * Possibly send a batch to the cambria server. This is called by the
- * background thread and the close() method
- *
- * @param force
- */
- private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
- }
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean shouldSendNow() {
- boolean shouldSend = false;
- if (fPending.size() > 0) {
- final long nowMs = Clock.now();
-
- shouldSend = (fPending.size() >= fMaxBatchSize);
- if (!shouldSend) {
- final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
- }
- return shouldSend;
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean sendBatch() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.size() < 1) {
- return true;
- }
-
- final long nowMs = Clock.now();
- final String url = CambriaPublisherUtility.makeUrl(fTopic);
-
- getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
-
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- OutputStream os = baseStream;
- if (fCompress) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
-
- final long startMs = Clock.now();
-
- // code from REST Client Starts
-
- // final String serverCalculatedSignature = sha1HmacSigner.sign
- // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
-
- Client client = ClientBuilder.newClient();
- String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
- if (null==metricTopicname) {
-
- metricTopicname="msgrtr.apinode.metrics.dmaap";
- }
- WebTarget target = client
- .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
- target = target.path("/events/" + fTopic);
- getLog().info("url : " + target.getUri().toString());
- // API Key
-
- Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
-
- Response response = target.request().post(data);
- // header("X-CambriaAuth",
- // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
- // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
- // post(Entity.json(baseStream.toByteArray()));
-
- getLog().info("Response received :: " + response.getStatus());
- getLog().info("Response received :: " + response.toString());
-
- // code from REST Client Ends
-
- /*
- * final JSONObject result = post ( url, contentType,
- * baseStream.toByteArray(), true ); final String logLine =
- * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
- * result.toString (); getLog().info ( logLine );
- */
- fPending.clear();
- return true;
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- }
- /*
- * catch ( HttpObjectNotFoundException x ) { getLog().warn (
- * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
- * x.getMessage(), x ); }
- */
- catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- }
- return false;
- }
-
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxBatchAgeMs;
- private final boolean fCompress;
- private boolean fClosed;
-
- private final LinkedBlockingQueue<TimestampedMessage> fPending;
- private long fDontSendUntilMs;
- private final ScheduledThreadPoolExecutor fExec;
-
- private static final long sfWaitAfterError = 1000;
-
- /**
- *
- * @param hosts
- * @param topic
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @param compress
- * @throws MalformedURLException
- */
- private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
- long maxBatchAgeMs, boolean compress) throws MalformedURLException {
-
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
-
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
- fDontSendUntilMs = 0;
-
- fExec = new ScheduledThreadPoolExecutor(1);
- fExec.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- send(false);
- }
- }, 100, 50, TimeUnit.MILLISECONDS);
- }
-
- /**
- *
- *
- * @author author
- *
- */
- private static class TimestampedMessage extends message {
- /**
- * to store timestamp value
- */
- public final long timestamp;
-
- /**
- * constructor initialize with message
- *
- * @param m
- *
- */
- public TimestampedMessage(message m) {
- super(m);
- timestamp = Clock.now();
- }
- }
-
-}