summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/metrics
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-10-23 12:18:59 -0400
committersunil unnava <sunil.unnava@att.com>2018-10-23 12:22:02 -0400
commit3504265229c589ecc166e3ad4c33bb198b11e4ce (patch)
tree022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/metrics
parent8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff)
update the package name1.1.11
Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics')
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java52
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java89
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java52
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java101
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java146
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java420
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java100
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java74
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java169
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java422
10 files changed, 0 insertions, 1625 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java
deleted file mode 100644
index 45644b7..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A Cambria batching publisher is a publisher with additional functionality
- * for managing delayed sends.
- *
- * @author peter
- *
- */
-public interface CambriaBatchingPublisher extends CambriaPublisher
-{
- /**
- * Get the number of messages that have not yet been sent.
- * @return the number of pending messages
- */
- int getPendingMessageCount ();
-
- /**
- * Close this publisher, sending any remaining messages.
- * @param timeout an amount of time to wait for unsent messages to be sent
- * @param timeoutUnits the time unit for the timeout arg
- * @return a list of any unsent messages after the timeout
- * @throws IOException
- * @throws InterruptedException
- */
- List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException;
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
deleted file mode 100644
index 4b219b1..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-
-
-//
-import com.att.eelf.configuration.EELFLogger;
-
-
-/**
- *
- * @author anowarul.islam
- *
- */
-public interface CambriaClient {
- /**
- * An exception at the Cambria layer. This is used when the HTTP transport
- * layer returns a success code but the transaction is not completed as
- * expected.
- */
- public class CambriaApiException extends Exception {
- /**
- *
- * @param msg
- */
- public CambriaApiException(String msg) {
- super(msg);
- }
-
- /**
- *
- * @param msg
- * @param t
- */
- public CambriaApiException(String msg, Throwable t) {
- super(msg, t);
- }
-
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * Optionally set the Logger to use
- *
- * @param log
- */
- void logTo(EELFLogger log);
-
- /**
- * Set the API credentials for this client connection. Subsequent calls will
- * include authentication headers.who i
- *
- * @param apiKey
- * @param apiSecret
- */
- void setApiCredentials(String apiKey, String apiSecret);
-
- /**
- * Remove API credentials, if any, on this connection. Subsequent calls will
- * not include authentication headers.
- */
- void clearApiCredentials();
-
- /**
- * Close this connection. Some client interfaces have additional close
- * capability.
- */
- void close();
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java
deleted file mode 100644
index 4a6ca81..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-import java.io.IOException;
-
-/**
- * This interface will provide fetch mechanism for consumer
- * @author nilanjana.maity
- *
- */
-public interface CambriaConsumer extends CambriaClient
-{
- /**
- * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call.
-
- * @return a set of messages
- * @throws IOException
- */
- Iterable<String> fetch () throws IOException;
-
- /**
- * Fetch a set of messages with an explicit timeout and limit for this call. These values
- * override any set in the constructor call.
- *
- * @param timeoutMs The amount of time in milliseconds that the server should keep the connection
- * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side).
- * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
- * @return a set messages
- * @throws IOException if there's a problem connecting to the server
- */
- Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException;
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java
deleted file mode 100644
index 4020a6d..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A Cambria publishing interface.
- *
- * @author peter
- *
- */
-public interface CambriaPublisher extends CambriaClient {
- /**
- * A simple message container
- */
- public static class message {
- /**
- *
- * @param partition
- * @param msg
- */
- public message(String partition, String msg) {
- fPartition = partition == null ? "" : partition;
- fMsg = msg;
- if (fMsg == null) {
- throw new IllegalArgumentException("Can't send a null message.");
- }
- }
-
- /**
- *
- * @param msg
- */
- public message(message msg) {
- this(msg.fPartition, msg.fMsg);
- }
-
- /**
- * declaring partition string
- */
- public final String fPartition;
- /**
- * declaring fMsg String
- */
- public final String fMsg;
- }
-
- /**
- * Send the given message using the given partition.
- *
- * @param partition
- * @param msg
- * @return the number of pending messages
- * @throws IOException
- */
- int send(String partition, String msg) throws IOException;
-
- /**
- * Send the given message using its partition.
- *
- * @param msg
- * @return the number of pending messages
- * @throws IOException
- */
- int send(message msg) throws IOException;
-
- /**
- * Send the given messages using their partitions.
- *
- * @param msgs
- * @return the number of pending messages
- * @throws IOException
- */
- int send(Collection<message> msgs) throws IOException;
-
- /**
- * Close this publisher. It's an error to call send() after close()
- */
- void close();
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
deleted file mode 100644
index 46dfa99..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.http.HttpHost;
-/**
- *
- * @author anowarul.islam
- *
- */
-public class CambriaPublisherUtility
-{
- public static final String kBasePath = "/events/";
- public static final int kStdCambriaServicePort = 3904;
-/**
- *
- * Translates a string into <code>application/x-www-form-urlencoded</code>
- * format using a specific encoding scheme.
- * @param s
- * @return
- *
- */
- public static String escape ( String s )
- {
- try
- {
- return URLEncoder.encode ( s, "UTF-8");
- }
- catch ( UnsupportedEncodingException e )
- {
- throw new RuntimeException ( e );
- }
- }
-/**
- *
- * building url
- * @param rawTopic
- * @return
- */
- public static String makeUrl ( String rawTopic )
- {
- final String cleanTopic = escape ( rawTopic );
-
- final StringBuffer url = new StringBuffer().
- append ( CambriaPublisherUtility.kBasePath ).
- append ( cleanTopic );
- return url.toString ();
- }
-/**
- *
- * building consumerUrl
- * @param topic
- * @param rawConsumerGroup
- * @param rawConsumerId
- * @return
- */
- public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
- {
- final String cleanConsumerGroup = escape ( rawConsumerGroup );
- final String cleanConsumerId = escape ( rawConsumerId );
- return CambriaPublisherUtility.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
- }
-
- /**
- * Create a list of HttpHosts from an input list of strings. Input strings have
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param hosts
- * @return a list of hosts
- */
- public static List<HttpHost> createHostsList(Collection<String> hosts)
- {
- final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
- for ( String host : hosts )
- {
- if ( host.length () == 0 ) continue;
- convertedHosts.add ( hostForString ( host ) );
- }
- return convertedHosts;
- }
-
- /**
- * Return an HttpHost from an input string. Input string has
- * host[:port] as format. If the port section is not provided, the default port is used.
- *
- * @param hosts
- * @return a list of hosts
- * if host.length<1 throws IllegalArgumentException
- *
- */
- public static HttpHost hostForString ( String host )
- {
- if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
-
- String hostPart = host;
- int port = kStdCambriaServicePort;
-
- final int colon = host.indexOf ( ':' );
- if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
- if ( colon > 0 )
- {
- hostPart = host.substring ( 0, colon ).trim();
-
- final String portPart = host.substring ( colon + 1 ).trim();
- if ( portPart.length () > 0 )
- {
- try
- {
- port = Integer.parseInt ( portPart );
- }
- catch ( NumberFormatException x )
- {
- throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
- }
- }
- // else: use default port on "foo:"
- }
-
- return new HttpHost ( hostPart, port );
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
deleted file mode 100644
index d7818de..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher;
-
-import java.net.MalformedURLException;
-import java.nio.channels.NotYetConnectedException;
-import java.util.Collection;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
-import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
-
-/**
- * A factory for Cambria clients.<br/>
- * <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept
- * a consumer group name, then it creates a consumer that is not restartable.
- * That is, if you stop your process and start it again, your client will NOT
- * receive any missed messages on the topic. If you need to ensure receipt of
- * missed messages, then you must use a consumer that's created with a group
- * name and ID. (If you create multiple consumer processes using the same group,
- * load is split across them. Be sure to use a different ID for each instance.)<br/>
- * <br/>
- * Publishers
- *
- * @author peter
- */
-public class DMaaPCambriaClientFactory {
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to Cambria.
- * You can include port numbers (3904 is the default). For
- * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- *
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(String hostList, String topic) {
- return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
- topic);
- }
-
- /**
- * Create a consumer instance with the default timeout and no limit on
- * messages returned. This consumer operates as an independent consumer
- * (i.e., not in a group) and is NOT re-startable across sessions.
- *
- * @param hostSet
- * The host used in the URL to Cambria. Entries can be
- * "host:port".
- * @param topic
- * The topic to consume
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(Collection<String> hostSet,
- String topic) {
- return createConsumer(hostSet, topic, null);
- }
-
- /**
- * Create a consumer instance with server-side filtering, the default
- * timeout, and no limit on messages returned. This consumer operates as an
- * independent consumer (i.e., not in a group) and is NOT re-startable
- * across sessions.
- *
- * @param hostSet
- * The host used in the URL to Cambria. Entries can be
- * "host:port".
- * @param topic
- * The topic to consume
- * @param filter
- * a filter to use on the server side
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(Collection<String> hostSet,
- String topic, String filter) {
- return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
- "0", -1, -1, filter, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to Cambria. Entries can be
- * "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(Collection<String> hostSet,
- final String topic, final String consumerGroup,
- final String consumerId) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart.
- *
- * @param hostSet
- * The host used in the URL to Cambria. Entries can be
- * "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(Collection<String> hostSet,
- final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit) {
- return createConsumer(hostSet, topic, consumerGroup, consumerId,
- timeoutMs, limit, null, null, null);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostList
- * A comma separated list of hosts to use to connect to Cambria.
- * You can include port numbers (3904 is the default). For
- * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- * @param apiKey
- * key associated with a user
- * @param apiSecret
- * of a user
- *
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(String hostList,
- final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter,
- String apiKey, String apiSecret) {
- return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
- topic, consumerGroup, consumerId, timeoutMs, limit, filter,
- apiKey, apiSecret);
- }
-
- /**
- * Create a consumer instance with the default timeout, and no limit on
- * messages returned. This consumer can operate in a logical group and is
- * re-startable across sessions when you use the same group and ID on
- * restart. This consumer also uses server-side filtering.
- *
- * @param hostSet
- * The host used in the URL to Cambria. Entries can be
- * "host:port".
- * @param topic
- * The topic to consume
- * @param consumerGroup
- * The name of the consumer group this consumer is part of
- * @param consumerId
- * The unique id of this consume in its group
- * @param timeoutMs
- * The amount of time in milliseconds that the server should keep
- * the connection open while waiting for message traffic. Use -1
- * for default timeout.
- * @param limit
- * A limit on the number of messages returned in a single call.
- * Use -1 for no limit.
- * @param filter
- * A Highland Park filter expression using only built-in filter
- * components. Use null for "no filter".
- * @param apiKey
- * key associated with a user
- * @param apiSecret
- * of a user
- * @return a consumer
- */
- public static CambriaConsumer createConsumer(Collection<String> hostSet,
- final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter,
- String apiKey, String apiSecret) {
- if (sfMock != null)
- return sfMock;
- try {
- return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
- consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
- } catch (MalformedURLException e) {
-
- NotYetConnectedException exception=new NotYetConnectedException();
- exception.setStackTrace(e.getStackTrace());
-
- throw exception ;
- }
- }
-
- /*************************************************************************/
- /*************************************************************************/
- /*************************************************************************/
-
- /**
- * Create a publisher that sends each message (or group of messages)
- * immediately. Most applications should favor higher latency for much
- * higher message throughput and the "simple publisher" is not a good
- * choice.
- *
- * @param hostlist
- * The host used in the URL to Cambria. Can be "host:port", can
- * be multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @return a publisher
- */
- public static CambriaBatchingPublisher createSimplePublisher(
- String hostlist, String topic) {
- return createBatchingPublisher(hostlist, topic, 1, 1);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown. Message payloads are
- * not compressed.
- *
- * @param hostlist
- * The host used in the URL to Cambria. Can be "host:port", can
- * be multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- *
- * @return a publisher
- */
- public static CambriaBatchingPublisher createBatchingPublisher(
- String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
- return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
- false);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostlist
- * The host used in the URL to Cambria. Can be "host:port", can
- * be multiple comma-separated entries.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static CambriaBatchingPublisher createBatchingPublisher(
- String hostlist, String topic, int maxBatchSize, long maxAgeMs,
- boolean compress) {
- return createBatchingPublisher(
- DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
- maxBatchSize, maxAgeMs, compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to Cambria. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static CambriaBatchingPublisher createBatchingPublisher(
- String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
- boolean compress) {
- final TreeSet<String> hosts = new TreeSet<String>();
- for (String hp : hostSet) {
- hosts.add(hp);
- }
- return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
- compress);
- }
-
- /**
- * Create a publisher that batches messages. Be sure to close the publisher
- * to send the last batch and ensure a clean shutdown.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to Cambria. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param topic
- * The topic on which to publish messages.
- * @param maxBatchSize
- * The largest set of messages to batch
- * @param maxAgeMs
- * The maximum age of a message waiting in a batch
- * @param compress
- * use gzip compression
- *
- * @return a publisher
- */
- public static CambriaBatchingPublisher createBatchingPublisher(
- Collection<String> hostSet, String topic, int maxBatchSize,
- long maxAgeMs, boolean compress) {
- return new DMaaPCambriaSimplerBatchPublisher.Builder()
- .againstUrls(hostSet).onTopic(topic)
- .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
- }
-
- /**
- * Create an identity manager client to work with API keys.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to Cambria. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return an identity manager
- */
-
-
- /**
- * Create a topic manager for working with topics.
- *
- * @param hostSet
- * A set of hosts to be used in the URL to Cambria. Can be
- * "host:port". Use multiple entries to enable failover.
- * @param apiKey
- * Your API key
- * @param apiSecret
- * Your API secret
- * @return a topic manager
- */
-
-
- /**
- * Inject a consumer. Used to support unit tests.
- *
- * @param cc
- */
- public static void $testInject(CambriaConsumer cc) {
- sfMock = cc;
- }
-
- private static CambriaConsumer sfMock = null;
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
deleted file mode 100644
index 84576fc..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher.impl;
-
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-
-import com.att.dmf.mr.constants.CambriaConstants;
-
-//import org.slf4j.LoggerFactory;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.apiClient.http.CacheUse;
-import com.att.nsa.apiClient.http.HttpClient;
-
-/**
- *
- * @author anowarul.islam
- *
- */
-public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metrics.publisher.CambriaClient
-{
- protected CambriaBaseClient ( Collection<String> hosts ) throws MalformedURLException
- {
- this ( hosts, null );
- }
-
- public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
- {
-
-
-
- super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
-
-
- fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
-
- }
-
- @Override
- public void close ()
- {
- }
-
- public Set<String> jsonArrayToSet ( JSONArray a ) throws JSONException
- {
- if ( a == null ) return null;
-
- final TreeSet<String> set = new TreeSet<>();
- for ( int i=0; i<a.length (); i++ )
- {
- set.add ( a.getString ( i ));
- }
- return set;
- }
- /**
- * @param log
- */
- public void logTo ( EELFLogger log )
- {
- fLog = log;
-
-
- }
-
- public EELFLogger getLog ()
- {
- return fLog;
- }
-
- private EELFLogger fLog;
-
-
-
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java
deleted file mode 100644
index 7463700..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher.impl;
-
-/**
- *
- * This class maintains the system clocks
- * @author nilanjana.maity
- *
- */
-public class Clock
-{
- public synchronized static Clock getIt ()
- {
- if ( sfClock == null )
- {
- sfClock = new Clock ();
- }
- return sfClock;
- }
-
- /**
- *
- * Get the system's current time in milliseconds.
- * @return the current time
- *
- */
- public static long now ()
- {
- return getIt().nowImpl ();
- }
-
- /**
- * Get current time in milliseconds
- * @return current time in ms
- */
- public long nowImpl ()
- {
- return System.currentTimeMillis ();
- }
-
- /**
- * Initialize constructor
- */
- public Clock ()
- {
- }
-
- private static Clock sfClock = null;
-
- public synchronized static void register ( Clock testClock )
- {
- sfClock = testClock;
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java
deleted file mode 100644
index ee56213..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher.impl;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.MalformedURLException;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
-
-import jline.internal.Log;
-
-/**
- *
- * @author anowarul.islam
- *
- */
-public class DMaaPCambriaConsumerImpl extends CambriaBaseClient
- implements com.att.dmf.mr.metrics.publisher.CambriaConsumer {
- private final String fTopic;
- private final String fGroup;
- private final String fId;
- private final int fTimeoutMs;
- private final int fLimit;
- private final String fFilter;
-
- /**
- *
- * @param hostPart
- * @param topic
- * @param consumerGroup
- * @param consumerId
- * @param timeoutMs
- * @param limit
- * @param filter
- * @param apiKey
- * @param apiSecret
- */
- public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
- final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException {
- super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
-
- fTopic = topic;
- fGroup = consumerGroup;
- fId = consumerId;
- fTimeoutMs = timeoutMs;
- fLimit = limit;
- fFilter = filter;
-
- setApiCredentials(apiKey, apiSecret);
- }
-
- /**
- * method converts String to list
- *
- * @param str
- * @return
- */
- public static List<String> stringToList(String str) {
- final LinkedList<String> set = new LinkedList<String>();
- if (str != null) {
- final String[] parts = str.trim().split(",");
- for (String part : parts) {
- final String trimmed = part.trim();
- if (trimmed.length() > 0) {
- set.add(trimmed);
- }
- }
- }
- return set;
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- // fetch with the timeout and limit set in constructor
- return fetch(fTimeoutMs, fLimit);
- }
-
- @Override
- public Iterable<String> fetch(int timeoutMs, int limit) throws IOException {
- final LinkedList<String> msgs = new LinkedList<String>();
-
- final String urlPath = createUrlPath(timeoutMs, limit);
-
- getLog().info("UEB GET " + urlPath);
- try {
- final JSONObject o = get(urlPath);
-
- if (o != null) {
- final JSONArray a = o.getJSONArray("result");
- if (a != null) {
- for (int i = 0; i < a.length(); i++) {
- msgs.add(a.getString(i));
- }
- }
- }
- } catch (HttpObjectNotFoundException e) {
- // this can happen if the topic is not yet created. ignore.
- Log.error("Failed due to topic is not yet created" + e);
- } catch (JSONException e) {
- // unexpected response
- reportProblemWithResponse();
- Log.error("Failed due to jsonException", e);
- } catch (HttpException e) {
- throw new IOException(e);
- }
-
- return msgs;
- }
-
- public String createUrlPath(int timeoutMs, int limit) {
- final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId));
- final StringBuilder adds = new StringBuilder();
- if (timeoutMs > -1) {
- adds.append("timeout=").append(timeoutMs);
- }
-
- if (limit > -1) {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("limit=").append(limit);
- }
- if (fFilter != null && fFilter.length() > 0) {
- try {
- if (adds.length() > 0) {
- adds.append("&");
- }
- adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- Log.error("Failed due to UnsupportedEncodingException" + e);
- }
- }
- if (adds.length() > 0) {
- url.append("?").append(adds.toString());
- }
- return url.toString();
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
deleted file mode 100644
index e9b1cdb..0000000
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ /dev/null
@@ -1,422 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.metrics.publisher.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.nio.channels.NotYetConnectedException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPOutputStream;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
-
-/**
- *
- * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
- * in batch
- *
- * @author anowarul.islam
- *
- */
-public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
- implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
- /**
- *
- * static inner class initializes with urls, topic,batchSize
- *
- * @author anowarul.islam
- *
- */
- public static class Builder {
- public Builder() {
- }
-
- /**
- * constructor initialize with url
- *
- * @param baseUrls
- * @return
- *
- */
- public Builder againstUrls(Collection<String> baseUrls) {
- fUrls = baseUrls;
- return this;
- }
-
- /**
- * constructor initializes with topics
- *
- * @param topic
- * @return
- *
- */
- public Builder onTopic(String topic) {
- fTopic = topic;
- return this;
- }
-
- /**
- * constructor initilazes with batch size and batch time
- *
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @return
- *
- */
- public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- return this;
- }
-
- /**
- * constructor initializes with compress
- *
- * @param compress
- * @return
- */
- public Builder compress(boolean compress) {
- fCompress = compress;
- return this;
- }
-
- /**
- * method returns DMaaPCambriaSimplerBatchPublisher object
- *
- * @return
- */
- public DMaaPCambriaSimplerBatchPublisher build() {
-
- try {
- return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
- } catch (MalformedURLException e) {
-
- NotYetConnectedException exception=new NotYetConnectedException();
- exception.setStackTrace(e.getStackTrace());
-
- throw exception ;
-
- }
- }
-
- private Collection<String> fUrls;
- private String fTopic;
- private int fMaxBatchSize = 100;
- private long fMaxBatchAgeMs = 1000;
- private boolean fCompress = false;
- };
-
- /**
- *
- * @param partition
- * @param msg
- */
- @Override
- public int send(String partition, String msg) {
- return send(new message(partition, msg));
- }
-
- /**
- * @param msg
- */
- @Override
- public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
- list.add(msg);
- return send(list);
- }
-
- /**
- * @param msgs
- */
- @Override
- public synchronized int send(Collection<message> msgs) {
- if (fClosed) {
- throw new IllegalStateException("The publisher was closed.");
- }
-
- for (message userMsg : msgs) {
- fPending.add(new TimestampedMessage(userMsg));
- }
- return getPendingMessageCount();
- }
-
- /**
- * getPending message count
- */
- @Override
- public synchronized int getPendingMessageCount() {
- return fPending.size();
- }
-
- /**
- *
- * @exception InterruptedException
- * @exception IOException
- */
- @Override
- public void close() {
- try {
- final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.isEmpty()) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
- }
- } catch (InterruptedException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- } catch (IOException e) {
- getLog().warn("Possible message loss. " + e.getMessage(), e);
- }
- }
-
- /**
- * @param time
- * @param unit
- */
- @Override
- public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
- synchronized (this) {
- fClosed = true;
-
- // stop the background sender
- fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- fExec.shutdown();
- }
-
- final long now = Clock.now();
- final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
- final long timeoutAtMs = now + waitInMs;
-
- while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
- send(true);
- Thread.sleep(250);
- }
- // synchronizing the current object
- synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
- fPending.drainTo(result);
- return result;
- }
- }
-
- /**
- * Possibly send a batch to the cambria server. This is called by the
- * background thread and the close() method
- *
- * @param force
- */
- private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
- getLog().warn("Send failed, " + fPending.size() + " message to send.");
-
- // note the time for back-off
- fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
- }
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean shouldSendNow() {
- boolean shouldSend = false;
- if (fPending.isEmpty()) {
- final long nowMs = Clock.now();
-
- shouldSend = (fPending.size() >= fMaxBatchSize);
- if (!shouldSend) {
- final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
- shouldSend = sendAtMs <= nowMs;
- }
-
- // however, wait after an error
- shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
- }
- return shouldSend;
- }
-
- /**
- *
- * @return
- */
- private synchronized boolean sendBatch() {
- // it's possible for this call to be made with an empty list. in this
- // case, just return.
- if (fPending.isEmpty()) {
- return true;
- }
-
- final long nowMs = Clock.now();
- final String url = CambriaPublisherUtility.makeUrl(fTopic);
-
- getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
-
- try {
-
- final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
- OutputStream os = baseStream;
- if (fCompress) {
- os = new GZIPOutputStream(baseStream);
- }
- for (TimestampedMessage m : fPending) {
- os.write(("" + m.fPartition.length()).getBytes());
- os.write('.');
- os.write(("" + m.fMsg.length()).getBytes());
- os.write('.');
- os.write(m.fPartition.getBytes());
- os.write(m.fMsg.getBytes());
- os.write('\n');
- }
- os.close();
-
- final long startMs = Clock.now();
-
- // code from REST Client Starts
-
-
-
-
- Client client = ClientBuilder.newClient();
- String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
- if (null==metricTopicname) {
-
- metricTopicname="msgrtr.apinode.metrics.dmaap";
- }
- WebTarget target = client
- .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
- target = target.path("/events/" + fTopic);
- getLog().info("url : " + target.getUri().toString());
- // API Key
-
- Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
-
- Response response = target.request().post(data);
-
- getLog().info("Response received :: " + response.getStatus());
- getLog().info("Response received :: " + response.toString());
-
- // code from REST Client Ends
-
-
- fPending.clear();
- return true;
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- }
-
- catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- }
- return false;
- }
-
- private final String fTopic;
- private final int fMaxBatchSize;
- private final long fMaxBatchAgeMs;
- private final boolean fCompress;
- private boolean fClosed;
-
- private final LinkedBlockingQueue<TimestampedMessage> fPending;
- private long fDontSendUntilMs;
- private final ScheduledThreadPoolExecutor fExec;
-
- private static final long sfWaitAfterError = 1000;
-
- /**
- *
- * @param hosts
- * @param topic
- * @param maxBatchSize
- * @param maxBatchAgeMs
- * @param compress
- */
- private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
- long maxBatchAgeMs, boolean compress) throws MalformedURLException {
-
- super(hosts);
-
- if (topic == null || topic.length() < 1) {
- throw new IllegalArgumentException("A topic must be provided.");
- }
-
- fClosed = false;
- fTopic = topic;
- fMaxBatchSize = maxBatchSize;
- fMaxBatchAgeMs = maxBatchAgeMs;
- fCompress = compress;
-
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
- fDontSendUntilMs = 0;
-
- fExec = new ScheduledThreadPoolExecutor(1);
- fExec.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- send(false);
- }
- }, 100, 50, TimeUnit.MILLISECONDS);
- }
-
- /**
- *
- *
- * @author anowarul.islam
- *
- */
- private static class TimestampedMessage extends message {
- /**
- * to store timestamp value
- */
- public final long timestamp;
-
- /**
- * constructor initialize with message
- *
- * @param m
- *
- */
- public TimestampedMessage(message m) {
- super(m);
- timestamp = Clock.now();
- }
- }
-
-}