summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics')
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java52
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java89
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java52
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java101
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java146
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java425
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java100
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java74
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java169
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java429
10 files changed, 1637 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java
new file mode 100644
index 0000000..45644b7
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Cambria batching publisher is a publisher with additional functionality
+ * for managing delayed sends.
+ *
+ * @author peter
+ *
+ */
+public interface CambriaBatchingPublisher extends CambriaPublisher
+{
+ /**
+ * Get the number of messages that have not yet been sent.
+ * @return the number of pending messages
+ */
+ int getPendingMessageCount ();
+
+ /**
+ * Close this publisher, sending any remaining messages.
+ * @param timeout an amount of time to wait for unsent messages to be sent
+ * @param timeoutUnits the time unit for the timeout arg
+ * @return a list of any unsent messages after the timeout
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException;
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
new file mode 100644
index 0000000..0993aa6
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+//import org.slf4j.Logger;
+
+//
+import com.att.eelf.configuration.EELFLogger;
+//import com.att.eelf.configuration.EELFManager;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public interface CambriaClient {
+ /**
+ * An exception at the Cambria layer. This is used when the HTTP transport
+ * layer returns a success code but the transaction is not completed as
+ * expected.
+ */
+ public class CambriaApiException extends Exception {
+ /**
+ *
+ * @param msg
+ */
+ public CambriaApiException(String msg) {
+ super(msg);
+ }
+
+ /**
+ *
+ * @param msg
+ * @param t
+ */
+ public CambriaApiException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Optionally set the Logger to use
+ *
+ * @param log
+ */
+ void logTo(EELFLogger log);
+
+ /**
+ * Set the API credentials for this client connection. Subsequent calls will
+ * include authentication headers.who i
+ *
+ * @param apiKey
+ * @param apiSecret
+ */
+ void setApiCredentials(String apiKey, String apiSecret);
+
+ /**
+ * Remove API credentials, if any, on this connection. Subsequent calls will
+ * not include authentication headers.
+ */
+ void clearApiCredentials();
+
+ /**
+ * Close this connection. Some client interfaces have additional close
+ * capability.
+ */
+ void close();
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java
new file mode 100644
index 0000000..4a6ca81
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+import java.io.IOException;
+
+/**
+ * This interface will provide fetch mechanism for consumer
+ * @author nilanjana.maity
+ *
+ */
+public interface CambriaConsumer extends CambriaClient
+{
+ /**
+ * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call.
+
+ * @return a set of messages
+ * @throws IOException
+ */
+ Iterable<String> fetch () throws IOException;
+
+ /**
+ * Fetch a set of messages with an explicit timeout and limit for this call. These values
+ * override any set in the constructor call.
+ *
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection
+ * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side).
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @return a set messages
+ * @throws IOException if there's a problem connecting to the server
+ */
+ Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException;
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java
new file mode 100644
index 0000000..4020a6d
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A Cambria publishing interface.
+ *
+ * @author peter
+ *
+ */
+public interface CambriaPublisher extends CambriaClient {
+ /**
+ * A simple message container
+ */
+ public static class message {
+ /**
+ *
+ * @param partition
+ * @param msg
+ */
+ public message(String partition, String msg) {
+ fPartition = partition == null ? "" : partition;
+ fMsg = msg;
+ if (fMsg == null) {
+ throw new IllegalArgumentException("Can't send a null message.");
+ }
+ }
+
+ /**
+ *
+ * @param msg
+ */
+ public message(message msg) {
+ this(msg.fPartition, msg.fMsg);
+ }
+
+ /**
+ * declaring partition string
+ */
+ public final String fPartition;
+ /**
+ * declaring fMsg String
+ */
+ public final String fMsg;
+ }
+
+ /**
+ * Send the given message using the given partition.
+ *
+ * @param partition
+ * @param msg
+ * @return the number of pending messages
+ * @throws IOException
+ */
+ int send(String partition, String msg) throws IOException;
+
+ /**
+ * Send the given message using its partition.
+ *
+ * @param msg
+ * @return the number of pending messages
+ * @throws IOException
+ */
+ int send(message msg) throws IOException;
+
+ /**
+ * Send the given messages using their partitions.
+ *
+ * @param msgs
+ * @return the number of pending messages
+ * @throws IOException
+ */
+ int send(Collection<message> msgs) throws IOException;
+
+ /**
+ * Close this publisher. It's an error to call send() after close()
+ */
+ void close();
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
new file mode 100644
index 0000000..1510c32
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
@@ -0,0 +1,146 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.http.HttpHost;
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class CambriaPublisherUtility
+{
+ public static final String kBasePath = "/events/";
+ public static final int kStdCambriaServicePort = 3904;
+/**
+ *
+ * Translates a string into <code>application/x-www-form-urlencoded</code>
+ * format using a specific encoding scheme.
+ * @param s
+ * @return
+ *
+ */
+ public static String escape ( String s )
+ {
+ try
+ {
+ return URLEncoder.encode ( s, "UTF-8");
+ }
+ catch ( UnsupportedEncodingException e )
+ {
+ throw new RuntimeException ( e );
+ }
+ }
+/**
+ *
+ * building url
+ * @param rawTopic
+ * @return
+ */
+ public static String makeUrl ( String rawTopic )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer().
+ append ( CambriaPublisherUtility.kBasePath ).
+ append ( cleanTopic );
+ return url.toString ();
+ }
+/**
+ *
+ * building consumerUrl
+ * @param topic
+ * @param rawConsumerGroup
+ * @param rawConsumerId
+ * @return
+ */
+ public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
+ {
+ final String cleanConsumerGroup = escape ( rawConsumerGroup );
+ final String cleanConsumerId = escape ( rawConsumerId );
+ return CambriaPublisherUtility.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
+ }
+
+ /**
+ * Create a list of HttpHosts from an input list of strings. Input strings have
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static List<HttpHost> createHostsList(Collection<String> hosts)
+ {
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ for ( String host : hosts )
+ {
+ if ( host.length () == 0 ) continue;
+ convertedHosts.add ( hostForString ( host ) );
+ }
+ return convertedHosts;
+ }
+
+ /**
+ * Return an HttpHost from an input string. Input string has
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ * if host.length<1 throws IllegalArgumentException
+ *
+ */
+ public static HttpHost hostForString ( String host )
+ {
+ if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
+
+ String hostPart = host;
+ int port = kStdCambriaServicePort;
+
+ final int colon = host.indexOf ( ':' );
+ if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
+ if ( colon > 0 )
+ {
+ hostPart = host.substring ( 0, colon ).trim();
+
+ final String portPart = host.substring ( colon + 1 ).trim();
+ if ( portPart.length () > 0 )
+ {
+ try
+ {
+ port = Integer.parseInt ( portPart );
+ }
+ catch ( NumberFormatException x )
+ {
+ throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
+ }
+ }
+ // else: use default port on "foo:"
+ }
+
+ return new HttpHost ( hostPart, port );
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
new file mode 100644
index 0000000..d02438f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
@@ -0,0 +1,425 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
+import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
+
+/**
+ * A factory for Cambria clients.<br/>
+ * <br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each instance.)<br/>
+ * <br/>
+ * Publishers
+ *
+ * @author peter
+ */
+public class DMaaPCambriaClientFactory {
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to Cambria.
+ * You can include port numbers (3904 is the default). For
+ * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ *
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(String hostList, String topic) {
+ return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
+ topic);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit on
+ * messages returned. This consumer operates as an independent consumer
+ * (i.e., not in a group) and is NOT re-startable across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to Cambria. Entries can be
+ * "host:port".
+ * @param topic
+ * The topic to consume
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(Collection<String> hostSet,
+ String topic) {
+ return createConsumer(hostSet, topic, null);
+ }
+
+ /**
+ * Create a consumer instance with server-side filtering, the default
+ * timeout, and no limit on messages returned. This consumer operates as an
+ * independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostSet
+ * The host used in the URL to Cambria. Entries can be
+ * "host:port".
+ * @param topic
+ * The topic to consume
+ * @param filter
+ * a filter to use on the server side
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(Collection<String> hostSet,
+ String topic, String filter) {
+ return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
+ "0", -1, -1, filter, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to Cambria. Entries can be
+ * "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(Collection<String> hostSet,
+ final String topic, final String consumerGroup,
+ final String consumerId) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart.
+ *
+ * @param hostSet
+ * The host used in the URL to Cambria. Entries can be
+ * "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(Collection<String> hostSet,
+ final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit) {
+ return createConsumer(hostSet, topic, consumerGroup, consumerId,
+ timeoutMs, limit, null, null, null);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostList
+ * A comma separated list of hosts to use to connect to Cambria.
+ * You can include port numbers (3904 is the default). For
+ * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ * @param apiKey
+ * key associated with a user
+ * @param apiSecret
+ * of a user
+ *
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(String hostList,
+ final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter,
+ String apiKey, String apiSecret) {
+ return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
+ topic, consumerGroup, consumerId, timeoutMs, limit, filter,
+ apiKey, apiSecret);
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit on
+ * messages returned. This consumer can operate in a logical group and is
+ * re-startable across sessions when you use the same group and ID on
+ * restart. This consumer also uses server-side filtering.
+ *
+ * @param hostSet
+ * The host used in the URL to Cambria. Entries can be
+ * "host:port".
+ * @param topic
+ * The topic to consume
+ * @param consumerGroup
+ * The name of the consumer group this consumer is part of
+ * @param consumerId
+ * The unique id of this consume in its group
+ * @param timeoutMs
+ * The amount of time in milliseconds that the server should keep
+ * the connection open while waiting for message traffic. Use -1
+ * for default timeout.
+ * @param limit
+ * A limit on the number of messages returned in a single call.
+ * Use -1 for no limit.
+ * @param filter
+ * A Highland Park filter expression using only built-in filter
+ * components. Use null for "no filter".
+ * @param apiKey
+ * key associated with a user
+ * @param apiSecret
+ * of a user
+ * @return a consumer
+ */
+ public static CambriaConsumer createConsumer(Collection<String> hostSet,
+ final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter,
+ String apiKey, String apiSecret) {
+ if (sfMock != null)
+ return sfMock;
+ try {
+ return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
+ consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * Create a publisher that sends each message (or group of messages)
+ * immediately. Most applications should favor higher latency for much
+ * higher message throughput and the "simple publisher" is not a good
+ * choice.
+ *
+ * @param hostlist
+ * The host used in the URL to Cambria. Can be "host:port", can
+ * be multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @return a publisher
+ */
+ public static CambriaBatchingPublisher createSimplePublisher(
+ String hostlist, String topic) {
+ return createBatchingPublisher(hostlist, topic, 1, 1);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown. Message payloads are
+ * not compressed.
+ *
+ * @param hostlist
+ * The host used in the URL to Cambria. Can be "host:port", can
+ * be multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ *
+ * @return a publisher
+ */
+ public static CambriaBatchingPublisher createBatchingPublisher(
+ String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
+ return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
+ false);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostlist
+ * The host used in the URL to Cambria. Can be "host:port", can
+ * be multiple comma-separated entries.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static CambriaBatchingPublisher createBatchingPublisher(
+ String hostlist, String topic, int maxBatchSize, long maxAgeMs,
+ boolean compress) {
+ return createBatchingPublisher(
+ DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
+ maxBatchSize, maxAgeMs, compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to Cambria. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static CambriaBatchingPublisher createBatchingPublisher(
+ String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
+ boolean compress) {
+ final TreeSet<String> hosts = new TreeSet<String>();
+ for (String hp : hostSet) {
+ hosts.add(hp);
+ }
+ return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
+ compress);
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher
+ * to send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to Cambria. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param topic
+ * The topic on which to publish messages.
+ * @param maxBatchSize
+ * The largest set of messages to batch
+ * @param maxAgeMs
+ * The maximum age of a message waiting in a batch
+ * @param compress
+ * use gzip compression
+ *
+ * @return a publisher
+ */
+ public static CambriaBatchingPublisher createBatchingPublisher(
+ Collection<String> hostSet, String topic, int maxBatchSize,
+ long maxAgeMs, boolean compress) {
+ return new DMaaPCambriaSimplerBatchPublisher.Builder()
+ .againstUrls(hostSet).onTopic(topic)
+ .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
+ }
+
+ /**
+ * Create an identity manager client to work with API keys.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to Cambria. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return an identity manager
+ */
+ /*
+ * public static CambriaIdentityManager createIdentityManager (
+ * Collection<String> hostSet, String apiKey, String apiSecret ) { final
+ * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
+ * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
+ */
+
+ /**
+ * Create a topic manager for working with topics.
+ *
+ * @param hostSet
+ * A set of hosts to be used in the URL to Cambria. Can be
+ * "host:port". Use multiple entries to enable failover.
+ * @param apiKey
+ * Your API key
+ * @param apiSecret
+ * Your API secret
+ * @return a topic manager
+ */
+ /*
+ * public static CambriaTopicManager createTopicManager ( Collection<String>
+ * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
+ * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
+ * apiSecret ); return tmi; }
+ */
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ *
+ * @param cc
+ */
+ public static void $testInject(CambriaConsumer cc) {
+ sfMock = cc;
+ }
+
+ private static CambriaConsumer sfMock = null;
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
new file mode 100644
index 0000000..08b2fd1
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher.impl;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.att.dmf.mr.constants.CambriaConstants;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.apiClient.http.CacheUse;
+import com.att.nsa.apiClient.http.HttpClient;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metrics.publisher.CambriaClient
+{
+ protected CambriaBaseClient ( Collection<String> hosts ) throws MalformedURLException
+ {
+ this ( hosts, null );
+ }
+
+ public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
+ {
+ /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature,
+ CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/
+
+ super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+ //fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
+ //( this.getClass().getName () );
+ }
+
+ @Override
+ public void close ()
+ {
+ }
+
+ public Set<String> jsonArrayToSet ( JSONArray a ) throws JSONException
+ {
+ if ( a == null ) return null;
+
+ final TreeSet<String> set = new TreeSet<String> ();
+ for ( int i=0; i<a.length (); i++ )
+ {
+ set.add ( a.getString ( i ));
+ }
+ return set;
+ }
+ /**
+ * @param log
+ */
+ public void logTo ( EELFLogger log )
+ {
+ fLog = log;
+
+ //replaceLogger ( log );
+ }
+
+ public EELFLogger getLog ()
+ {
+ return fLog;
+ }
+
+ private EELFLogger fLog;
+
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java
new file mode 100644
index 0000000..7463700
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher.impl;
+
+/**
+ *
+ * This class maintains the system clocks
+ * @author nilanjana.maity
+ *
+ */
+public class Clock
+{
+ public synchronized static Clock getIt ()
+ {
+ if ( sfClock == null )
+ {
+ sfClock = new Clock ();
+ }
+ return sfClock;
+ }
+
+ /**
+ *
+ * Get the system's current time in milliseconds.
+ * @return the current time
+ *
+ */
+ public static long now ()
+ {
+ return getIt().nowImpl ();
+ }
+
+ /**
+ * Get current time in milliseconds
+ * @return current time in ms
+ */
+ public long nowImpl ()
+ {
+ return System.currentTimeMillis ();
+ }
+
+ /**
+ * Initialize constructor
+ */
+ public Clock ()
+ {
+ }
+
+ private static Clock sfClock = null;
+
+ public synchronized static void register ( Clock testClock )
+ {
+ sfClock = testClock;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java
new file mode 100644
index 0000000..ee56213
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java
@@ -0,0 +1,169 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher.impl;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+import jline.internal.Log;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class DMaaPCambriaConsumerImpl extends CambriaBaseClient
+ implements com.att.dmf.mr.metrics.publisher.CambriaConsumer {
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final int fTimeoutMs;
+ private final int fLimit;
+ private final String fFilter;
+
+ /**
+ *
+ * @param hostPart
+ * @param topic
+ * @param consumerGroup
+ * @param consumerId
+ * @param timeoutMs
+ * @param limit
+ * @param filter
+ * @param apiKey
+ * @param apiSecret
+ */
+ public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException {
+ super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
+
+ fTopic = topic;
+ fGroup = consumerGroup;
+ fId = consumerId;
+ fTimeoutMs = timeoutMs;
+ fLimit = limit;
+ fFilter = filter;
+
+ setApiCredentials(apiKey, apiSecret);
+ }
+
+ /**
+ * method converts String to list
+ *
+ * @param str
+ * @return
+ */
+ public static List<String> stringToList(String str) {
+ final LinkedList<String> set = new LinkedList<String>();
+ if (str != null) {
+ final String[] parts = str.trim().split(",");
+ for (String part : parts) {
+ final String trimmed = part.trim();
+ if (trimmed.length() > 0) {
+ set.add(trimmed);
+ }
+ }
+ }
+ return set;
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException {
+ // fetch with the timeout and limit set in constructor
+ return fetch(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public Iterable<String> fetch(int timeoutMs, int limit) throws IOException {
+ final LinkedList<String> msgs = new LinkedList<String>();
+
+ final String urlPath = createUrlPath(timeoutMs, limit);
+
+ getLog().info("UEB GET " + urlPath);
+ try {
+ final JSONObject o = get(urlPath);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ msgs.add(a.getString(i));
+ }
+ }
+ }
+ } catch (HttpObjectNotFoundException e) {
+ // this can happen if the topic is not yet created. ignore.
+ Log.error("Failed due to topic is not yet created" + e);
+ } catch (JSONException e) {
+ // unexpected response
+ reportProblemWithResponse();
+ Log.error("Failed due to jsonException", e);
+ } catch (HttpException e) {
+ throw new IOException(e);
+ }
+
+ return msgs;
+ }
+
+ public String createUrlPath(int timeoutMs, int limit) {
+ final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId));
+ final StringBuilder adds = new StringBuilder();
+ if (timeoutMs > -1) {
+ adds.append("timeout=").append(timeoutMs);
+ }
+
+ if (limit > -1) {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("limit=").append(limit);
+ }
+ if (fFilter != null && fFilter.length() > 0) {
+ try {
+ if (adds.length() > 0) {
+ adds.append("&");
+ }
+ adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ Log.error("Failed due to UnsupportedEncodingException" + e);
+ }
+ }
+ if (adds.length() > 0) {
+ url.append("?").append(adds.toString());
+ }
+ return url.toString();
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
new file mode 100644
index 0000000..d8d8799
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -0,0 +1,429 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.metrics.publisher.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
+
+/**
+ *
+ * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
+ * in batch
+ *
+ * @author anowarul.islam
+ *
+ */
+public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
+ implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
+ /**
+ *
+ * static inner class initializes with urls, topic,batchSize
+ *
+ * @author anowarul.islam
+ *
+ */
+ public static class Builder {
+ public Builder() {
+ }
+
+ /**
+ * constructor initialize with url
+ *
+ * @param baseUrls
+ * @return
+ *
+ */
+ public Builder againstUrls(Collection<String> baseUrls) {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ /**
+ * constructor initializes with topics
+ *
+ * @param topic
+ * @return
+ *
+ */
+ public Builder onTopic(String topic) {
+ fTopic = topic;
+ return this;
+ }
+
+ /**
+ * constructor initilazes with batch size and batch time
+ *
+ * @param maxBatchSize
+ * @param maxBatchAgeMs
+ * @return
+ *
+ */
+ public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ /**
+ * constructor initializes with compress
+ *
+ * @param compress
+ * @return
+ */
+ public Builder compress(boolean compress) {
+ fCompress = compress;
+ return this;
+ }
+
+ /**
+ * method returns DMaaPCambriaSimplerBatchPublisher object
+ *
+ * @return
+ */
+ public DMaaPCambriaSimplerBatchPublisher build() {
+
+ try {
+ return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Collection<String> fUrls;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ };
+
+ /**
+ *
+ * @param partition
+ * @param msg
+ */
+ @Override
+ public int send(String partition, String msg) {
+ return send(new message(partition, msg));
+ }
+
+ /**
+ * @param msg
+ */
+ @Override
+ public int send(message msg) {
+ final LinkedList<message> list = new LinkedList<message>();
+ list.add(msg);
+ return send(list);
+ }
+
+ /**
+ * @param msgs
+ */
+ @Override
+ public synchronized int send(Collection<message> msgs) {
+ if (fClosed) {
+ throw new IllegalStateException("The publisher was closed.");
+ }
+
+ for (message userMsg : msgs) {
+ fPending.add(new TimestampedMessage(userMsg));
+ }
+ return getPendingMessageCount();
+ }
+
+ /**
+ * getPending message count
+ */
+ @Override
+ public synchronized int getPendingMessageCount() {
+ return fPending.size();
+ }
+
+ /**
+ *
+ * @exception InterruptedException
+ * @exception IOException
+ */
+ @Override
+ public void close() {
+ try {
+ final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (remains.size() > 0) {
+ getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+ }
+ } catch (InterruptedException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ } catch (IOException e) {
+ getLog().warn("Possible message loss. " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param time
+ * @param unit
+ */
+ @Override
+ public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+ synchronized (this) {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ fExec.shutdown();
+ }
+
+ final long now = Clock.now();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ final long timeoutAtMs = now + waitInMs;
+
+ while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+ send(true);
+ Thread.sleep(250);
+ }
+ // synchronizing the current object
+ synchronized (this) {
+ final LinkedList<message> result = new LinkedList<message>();
+ fPending.drainTo(result);
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the cambria server. This is called by the
+ * background thread and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send(boolean force) {
+ if (force || shouldSendNow()) {
+ if (!sendBatch()) {
+ getLog().warn("Send failed, " + fPending.size() + " message to send.");
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + Clock.now();
+ }
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ private synchronized boolean shouldSendNow() {
+ boolean shouldSend = false;
+ if (fPending.size() > 0) {
+ final long nowMs = Clock.now();
+
+ shouldSend = (fPending.size() >= fMaxBatchSize);
+ if (!shouldSend) {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ /**
+ *
+ * @return
+ */
+ private synchronized boolean sendBatch() {
+ // it's possible for this call to be made with an empty list. in this
+ // case, just return.
+ if (fPending.size() < 1) {
+ return true;
+ }
+
+ final long nowMs = Clock.now();
+ final String url = CambriaPublisherUtility.makeUrl(fTopic);
+
+ getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+
+ try {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+ OutputStream os = baseStream;
+ if (fCompress) {
+ os = new GZIPOutputStream(baseStream);
+ }
+ for (TimestampedMessage m : fPending) {
+ os.write(("" + m.fPartition.length()).getBytes());
+ os.write('.');
+ os.write(("" + m.fMsg.length()).getBytes());
+ os.write('.');
+ os.write(m.fPartition.getBytes());
+ os.write(m.fMsg.getBytes());
+ os.write('\n');
+ }
+ os.close();
+
+ final long startMs = Clock.now();
+
+ // code from REST Client Starts
+
+ // final String serverCalculatedSignature = sha1HmacSigner.sign
+ // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+
+ Client client = ClientBuilder.newClient();
+ String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
+ if (null==metricTopicname) {
+
+ metricTopicname="msgrtr.apinode.metrics.dmaap";
+ }
+ WebTarget target = client
+ .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
+ target = target.path("/events/" + fTopic);
+ getLog().info("url : " + target.getUri().toString());
+ // API Key
+
+ Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
+
+ Response response = target.request().post(data);
+ // header("X-CambriaAuth",
+ // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
+ // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
+ // post(Entity.json(baseStream.toByteArray()));
+
+ getLog().info("Response received :: " + response.getStatus());
+ getLog().info("Response received :: " + response.toString());
+
+ // code from REST Client Ends
+
+ /*
+ * final JSONObject result = post ( url, contentType,
+ * baseStream.toByteArray(), true ); final String logLine =
+ * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
+ * result.toString (); getLog().info ( logLine );
+ */
+ fPending.clear();
+ return true;
+ } catch (IllegalArgumentException x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ /*
+ * catch ( HttpObjectNotFoundException x ) { getLog().warn (
+ * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
+ * x.getMessage(), x ); }
+ */
+ catch (IOException x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private boolean fClosed;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private static final long sfWaitAfterError = 1000;
+
+ /**
+ *
+ * @param hosts
+ * @param topic
+ * @param maxBatchSize
+ * @param maxBatchAgeMs
+ * @param compress
+ */
+ private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
+ long maxBatchAgeMs, boolean compress) throws MalformedURLException {
+
+ super(hosts);
+
+ if (topic == null || topic.length() < 1) {
+ throw new IllegalArgumentException("A topic must be provided.");
+ }
+
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fDontSendUntilMs = 0;
+
+ fExec = new ScheduledThreadPoolExecutor(1);
+ fExec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ send(false);
+ }
+ }, 100, 50, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ *
+ *
+ * @author anowarul.islam
+ *
+ */
+ private static class TimestampedMessage extends message {
+ /**
+ * to store timestamp value
+ */
+ public final long timestamp;
+
+ /**
+ * constructor initialize with message
+ *
+ * @param m
+ *
+ */
+ public TimestampedMessage(message m) {
+ super(m);
+ timestamp = Clock.now();
+ }
+ }
+
+}