From 3504265229c589ecc166e3ad4c33bb198b11e4ce Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 23 Oct 2018 12:18:59 -0400 Subject: update the package name Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava --- .../publisher/CambriaBatchingPublisher.java | 52 --- .../dmf/mr/metrics/publisher/CambriaClient.java | 89 ----- .../dmf/mr/metrics/publisher/CambriaConsumer.java | 52 --- .../dmf/mr/metrics/publisher/CambriaPublisher.java | 101 ----- .../metrics/publisher/CambriaPublisherUtility.java | 146 ------- .../publisher/DMaaPCambriaClientFactory.java | 420 -------------------- .../metrics/publisher/impl/CambriaBaseClient.java | 100 ----- .../att/dmf/mr/metrics/publisher/impl/Clock.java | 74 ---- .../publisher/impl/DMaaPCambriaConsumerImpl.java | 169 --------- .../impl/DMaaPCambriaSimplerBatchPublisher.java | 422 --------------------- 10 files changed, 1625 deletions(-) delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java delete mode 100644 src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java (limited to 'src/main/java/com/att/dmf/mr/metrics/publisher') diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java deleted file mode 100644 index 45644b7..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * A Cambria batching publisher is a publisher with additional functionality - * for managing delayed sends. - * - * @author peter - * - */ -public interface CambriaBatchingPublisher extends CambriaPublisher -{ - /** - * Get the number of messages that have not yet been sent. - * @return the number of pending messages - */ - int getPendingMessageCount (); - - /** - * Close this publisher, sending any remaining messages. - * @param timeout an amount of time to wait for unsent messages to be sent - * @param timeoutUnits the time unit for the timeout arg - * @return a list of any unsent messages after the timeout - * @throws IOException - * @throws InterruptedException - */ - List close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException; -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java deleted file mode 100644 index 4b219b1..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java +++ /dev/null @@ -1,89 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - - - -// -import com.att.eelf.configuration.EELFLogger; - - -/** - * - * @author anowarul.islam - * - */ -public interface CambriaClient { - /** - * An exception at the Cambria layer. This is used when the HTTP transport - * layer returns a success code but the transaction is not completed as - * expected. - */ - public class CambriaApiException extends Exception { - /** - * - * @param msg - */ - public CambriaApiException(String msg) { - super(msg); - } - - /** - * - * @param msg - * @param t - */ - public CambriaApiException(String msg, Throwable t) { - super(msg, t); - } - - private static final long serialVersionUID = 1L; - } - - /** - * Optionally set the Logger to use - * - * @param log - */ - void logTo(EELFLogger log); - - /** - * Set the API credentials for this client connection. Subsequent calls will - * include authentication headers.who i - * - * @param apiKey - * @param apiSecret - */ - void setApiCredentials(String apiKey, String apiSecret); - - /** - * Remove API credentials, if any, on this connection. Subsequent calls will - * not include authentication headers. - */ - void clearApiCredentials(); - - /** - * Close this connection. Some client interfaces have additional close - * capability. - */ - void close(); -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java deleted file mode 100644 index 4a6ca81..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - -import java.io.IOException; - -/** - * This interface will provide fetch mechanism for consumer - * @author nilanjana.maity - * - */ -public interface CambriaConsumer extends CambriaClient -{ - /** - * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call. - - * @return a set of messages - * @throws IOException - */ - Iterable fetch () throws IOException; - - /** - * Fetch a set of messages with an explicit timeout and limit for this call. These values - * override any set in the constructor call. - * - * @param timeoutMs The amount of time in milliseconds that the server should keep the connection - * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side). - * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. - * @return a set messages - * @throws IOException if there's a problem connecting to the server - */ - Iterable fetch ( int timeoutMs, int limit ) throws IOException; -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java deleted file mode 100644 index 4020a6d..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java +++ /dev/null @@ -1,101 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - -import java.io.IOException; -import java.util.Collection; - -/** - * A Cambria publishing interface. - * - * @author peter - * - */ -public interface CambriaPublisher extends CambriaClient { - /** - * A simple message container - */ - public static class message { - /** - * - * @param partition - * @param msg - */ - public message(String partition, String msg) { - fPartition = partition == null ? "" : partition; - fMsg = msg; - if (fMsg == null) { - throw new IllegalArgumentException("Can't send a null message."); - } - } - - /** - * - * @param msg - */ - public message(message msg) { - this(msg.fPartition, msg.fMsg); - } - - /** - * declaring partition string - */ - public final String fPartition; - /** - * declaring fMsg String - */ - public final String fMsg; - } - - /** - * Send the given message using the given partition. - * - * @param partition - * @param msg - * @return the number of pending messages - * @throws IOException - */ - int send(String partition, String msg) throws IOException; - - /** - * Send the given message using its partition. - * - * @param msg - * @return the number of pending messages - * @throws IOException - */ - int send(message msg) throws IOException; - - /** - * Send the given messages using their partitions. - * - * @param msgs - * @return the number of pending messages - * @throws IOException - */ - int send(Collection msgs) throws IOException; - - /** - * Close this publisher. It's an error to call send() after close() - */ - void close(); -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java deleted file mode 100644 index 46dfa99..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java +++ /dev/null @@ -1,146 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.http.HttpHost; -/** - * - * @author anowarul.islam - * - */ -public class CambriaPublisherUtility -{ - public static final String kBasePath = "/events/"; - public static final int kStdCambriaServicePort = 3904; -/** - * - * Translates a string into application/x-www-form-urlencoded - * format using a specific encoding scheme. - * @param s - * @return - * - */ - public static String escape ( String s ) - { - try - { - return URLEncoder.encode ( s, "UTF-8"); - } - catch ( UnsupportedEncodingException e ) - { - throw new RuntimeException ( e ); - } - } -/** - * - * building url - * @param rawTopic - * @return - */ - public static String makeUrl ( String rawTopic ) - { - final String cleanTopic = escape ( rawTopic ); - - final StringBuffer url = new StringBuffer(). - append ( CambriaPublisherUtility.kBasePath ). - append ( cleanTopic ); - return url.toString (); - } -/** - * - * building consumerUrl - * @param topic - * @param rawConsumerGroup - * @param rawConsumerId - * @return - */ - public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId ) - { - final String cleanConsumerGroup = escape ( rawConsumerGroup ); - final String cleanConsumerId = escape ( rawConsumerId ); - return CambriaPublisherUtility.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; - } - - /** - * Create a list of HttpHosts from an input list of strings. Input strings have - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param hosts - * @return a list of hosts - */ - public static List createHostsList(Collection hosts) - { - final ArrayList convertedHosts = new ArrayList<>(); - for ( String host : hosts ) - { - if ( host.length () == 0 ) continue; - convertedHosts.add ( hostForString ( host ) ); - } - return convertedHosts; - } - - /** - * Return an HttpHost from an input string. Input string has - * host[:port] as format. If the port section is not provided, the default port is used. - * - * @param hosts - * @return a list of hosts - * if host.length<1 throws IllegalArgumentException - * - */ - public static HttpHost hostForString ( String host ) - { - if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); - - String hostPart = host; - int port = kStdCambriaServicePort; - - final int colon = host.indexOf ( ':' ); - if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); - if ( colon > 0 ) - { - hostPart = host.substring ( 0, colon ).trim(); - - final String portPart = host.substring ( colon + 1 ).trim(); - if ( portPart.length () > 0 ) - { - try - { - port = Integer.parseInt ( portPart ); - } - catch ( NumberFormatException x ) - { - throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x ); - } - } - // else: use default port on "foo:" - } - - return new HttpHost ( hostPart, port ); - } -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java deleted file mode 100644 index d7818de..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java +++ /dev/null @@ -1,420 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher; - -import java.net.MalformedURLException; -import java.nio.channels.NotYetConnectedException; -import java.util.Collection; -import java.util.TreeSet; -import java.util.UUID; - -import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl; -import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher; - -/** - * A factory for Cambria clients.
- *
- * Use caution selecting a consumer creator factory. If the call doesn't accept - * a consumer group name, then it creates a consumer that is not restartable. - * That is, if you stop your process and start it again, your client will NOT - * receive any missed messages on the topic. If you need to ensure receipt of - * missed messages, then you must use a consumer that's created with a group - * name and ID. (If you create multiple consumer processes using the same group, - * load is split across them. Be sure to use a different ID for each instance.)
- *
- * Publishers - * - * @author peter - */ -public class DMaaPCambriaClientFactory { - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostList - * A comma separated list of hosts to use to connect to Cambria. - * You can include port numbers (3904 is the default). For - * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(String hostList, String topic) { - return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList), - topic); - } - - /** - * Create a consumer instance with the default timeout and no limit on - * messages returned. This consumer operates as an independent consumer - * (i.e., not in a group) and is NOT re-startable across sessions. - * - * @param hostSet - * The host used in the URL to Cambria. Entries can be - * "host:port". - * @param topic - * The topic to consume - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(Collection hostSet, - String topic) { - return createConsumer(hostSet, topic, null); - } - - /** - * Create a consumer instance with server-side filtering, the default - * timeout, and no limit on messages returned. This consumer operates as an - * independent consumer (i.e., not in a group) and is NOT re-startable - * across sessions. - * - * @param hostSet - * The host used in the URL to Cambria. Entries can be - * "host:port". - * @param topic - * The topic to consume - * @param filter - * a filter to use on the server side - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(Collection hostSet, - String topic, String filter) { - return createConsumer(hostSet, topic, UUID.randomUUID().toString(), - "0", -1, -1, filter, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to Cambria. Entries can be - * "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(Collection hostSet, - final String topic, final String consumerGroup, - final String consumerId) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. - * - * @param hostSet - * The host used in the URL to Cambria. Entries can be - * "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(Collection hostSet, - final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit) { - return createConsumer(hostSet, topic, consumerGroup, consumerId, - timeoutMs, limit, null, null, null); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostList - * A comma separated list of hosts to use to connect to Cambria. - * You can include port numbers (3904 is the default). For - * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * @param apiKey - * key associated with a user - * @param apiSecret - * of a user - * - * @return a consumer - */ - public static CambriaConsumer createConsumer(String hostList, - final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, - String apiKey, String apiSecret) { - return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList), - topic, consumerGroup, consumerId, timeoutMs, limit, filter, - apiKey, apiSecret); - } - - /** - * Create a consumer instance with the default timeout, and no limit on - * messages returned. This consumer can operate in a logical group and is - * re-startable across sessions when you use the same group and ID on - * restart. This consumer also uses server-side filtering. - * - * @param hostSet - * The host used in the URL to Cambria. Entries can be - * "host:port". - * @param topic - * The topic to consume - * @param consumerGroup - * The name of the consumer group this consumer is part of - * @param consumerId - * The unique id of this consume in its group - * @param timeoutMs - * The amount of time in milliseconds that the server should keep - * the connection open while waiting for message traffic. Use -1 - * for default timeout. - * @param limit - * A limit on the number of messages returned in a single call. - * Use -1 for no limit. - * @param filter - * A Highland Park filter expression using only built-in filter - * components. Use null for "no filter". - * @param apiKey - * key associated with a user - * @param apiSecret - * of a user - * @return a consumer - */ - public static CambriaConsumer createConsumer(Collection hostSet, - final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, - String apiKey, String apiSecret) { - if (sfMock != null) - return sfMock; - try { - return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup, - consumerId, timeoutMs, limit, filter, apiKey, apiSecret); - } catch (MalformedURLException e) { - - NotYetConnectedException exception=new NotYetConnectedException(); - exception.setStackTrace(e.getStackTrace()); - - throw exception ; - } - } - - /*************************************************************************/ - /*************************************************************************/ - /*************************************************************************/ - - /** - * Create a publisher that sends each message (or group of messages) - * immediately. Most applications should favor higher latency for much - * higher message throughput and the "simple publisher" is not a good - * choice. - * - * @param hostlist - * The host used in the URL to Cambria. Can be "host:port", can - * be multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @return a publisher - */ - public static CambriaBatchingPublisher createSimplePublisher( - String hostlist, String topic) { - return createBatchingPublisher(hostlist, topic, 1, 1); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. Message payloads are - * not compressed. - * - * @param hostlist - * The host used in the URL to Cambria. Can be "host:port", can - * be multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * - * @return a publisher - */ - public static CambriaBatchingPublisher createBatchingPublisher( - String hostlist, String topic, int maxBatchSize, long maxAgeMs) { - return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, - false); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostlist - * The host used in the URL to Cambria. Can be "host:port", can - * be multiple comma-separated entries. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static CambriaBatchingPublisher createBatchingPublisher( - String hostlist, String topic, int maxBatchSize, long maxAgeMs, - boolean compress) { - return createBatchingPublisher( - DMaaPCambriaConsumerImpl.stringToList(hostlist), topic, - maxBatchSize, maxAgeMs, compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to Cambria. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static CambriaBatchingPublisher createBatchingPublisher( - String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, - boolean compress) { - final TreeSet hosts = new TreeSet(); - for (String hp : hostSet) { - hosts.add(hp); - } - return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, - compress); - } - - /** - * Create a publisher that batches messages. Be sure to close the publisher - * to send the last batch and ensure a clean shutdown. - * - * @param hostSet - * A set of hosts to be used in the URL to Cambria. Can be - * "host:port". Use multiple entries to enable failover. - * @param topic - * The topic on which to publish messages. - * @param maxBatchSize - * The largest set of messages to batch - * @param maxAgeMs - * The maximum age of a message waiting in a batch - * @param compress - * use gzip compression - * - * @return a publisher - */ - public static CambriaBatchingPublisher createBatchingPublisher( - Collection hostSet, String topic, int maxBatchSize, - long maxAgeMs, boolean compress) { - return new DMaaPCambriaSimplerBatchPublisher.Builder() - .againstUrls(hostSet).onTopic(topic) - .batchTo(maxBatchSize, maxAgeMs).compress(compress).build(); - } - - /** - * Create an identity manager client to work with API keys. - * - * @param hostSet - * A set of hosts to be used in the URL to Cambria. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return an identity manager - */ - - - /** - * Create a topic manager for working with topics. - * - * @param hostSet - * A set of hosts to be used in the URL to Cambria. Can be - * "host:port". Use multiple entries to enable failover. - * @param apiKey - * Your API key - * @param apiSecret - * Your API secret - * @return a topic manager - */ - - - /** - * Inject a consumer. Used to support unit tests. - * - * @param cc - */ - public static void $testInject(CambriaConsumer cc) { - sfMock = cc; - } - - private static CambriaConsumer sfMock = null; -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java deleted file mode 100644 index 84576fc..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java +++ /dev/null @@ -1,100 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher.impl; - -import java.net.MalformedURLException; -import java.util.Collection; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import org.json.JSONArray; -import org.json.JSONException; - -import com.att.dmf.mr.constants.CambriaConstants; - -//import org.slf4j.LoggerFactory; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.apiClient.http.CacheUse; -import com.att.nsa.apiClient.http.HttpClient; - -/** - * - * @author anowarul.islam - * - */ -public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metrics.publisher.CambriaClient -{ - protected CambriaBaseClient ( Collection hosts ) throws MalformedURLException - { - this ( hosts, null ); - } - - public CambriaBaseClient ( Collection hosts, String clientSignature ) throws MalformedURLException - { - - - - super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); - - - fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); - - } - - @Override - public void close () - { - } - - public Set jsonArrayToSet ( JSONArray a ) throws JSONException - { - if ( a == null ) return null; - - final TreeSet set = new TreeSet<>(); - for ( int i=0; i hostPart, final String topic, final String consumerGroup, - final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException { - super(hostPart, topic + "::" + consumerGroup + "::" + consumerId); - - fTopic = topic; - fGroup = consumerGroup; - fId = consumerId; - fTimeoutMs = timeoutMs; - fLimit = limit; - fFilter = filter; - - setApiCredentials(apiKey, apiSecret); - } - - /** - * method converts String to list - * - * @param str - * @return - */ - public static List stringToList(String str) { - final LinkedList set = new LinkedList(); - if (str != null) { - final String[] parts = str.trim().split(","); - for (String part : parts) { - final String trimmed = part.trim(); - if (trimmed.length() > 0) { - set.add(trimmed); - } - } - } - return set; - } - - @Override - public Iterable fetch() throws IOException { - // fetch with the timeout and limit set in constructor - return fetch(fTimeoutMs, fLimit); - } - - @Override - public Iterable fetch(int timeoutMs, int limit) throws IOException { - final LinkedList msgs = new LinkedList(); - - final String urlPath = createUrlPath(timeoutMs, limit); - - getLog().info("UEB GET " + urlPath); - try { - final JSONObject o = get(urlPath); - - if (o != null) { - final JSONArray a = o.getJSONArray("result"); - if (a != null) { - for (int i = 0; i < a.length(); i++) { - msgs.add(a.getString(i)); - } - } - } - } catch (HttpObjectNotFoundException e) { - // this can happen if the topic is not yet created. ignore. - Log.error("Failed due to topic is not yet created" + e); - } catch (JSONException e) { - // unexpected response - reportProblemWithResponse(); - Log.error("Failed due to jsonException", e); - } catch (HttpException e) { - throw new IOException(e); - } - - return msgs; - } - - public String createUrlPath(int timeoutMs, int limit) { - final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId)); - final StringBuilder adds = new StringBuilder(); - if (timeoutMs > -1) { - adds.append("timeout=").append(timeoutMs); - } - - if (limit > -1) { - if (adds.length() > 0) { - adds.append("&"); - } - adds.append("limit=").append(limit); - } - if (fFilter != null && fFilter.length() > 0) { - try { - if (adds.length() > 0) { - adds.append("&"); - } - adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); - } catch (UnsupportedEncodingException e) { - Log.error("Failed due to UnsupportedEncodingException" + e); - } - } - if (adds.length() > 0) { - url.append("?").append(adds.toString()); - } - return url.toString(); - } - -} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java deleted file mode 100644 index e9b1cdb..0000000 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java +++ /dev/null @@ -1,422 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.metrics.publisher.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.MalformedURLException; -import java.nio.channels.NotYetConnectedException; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPOutputStream; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import com.att.dmf.mr.constants.CambriaConstants; -import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility; - -/** - * - * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages - * in batch - * - * @author anowarul.islam - * - */ -public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient - implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher { - /** - * - * static inner class initializes with urls, topic,batchSize - * - * @author anowarul.islam - * - */ - public static class Builder { - public Builder() { - } - - /** - * constructor initialize with url - * - * @param baseUrls - * @return - * - */ - public Builder againstUrls(Collection baseUrls) { - fUrls = baseUrls; - return this; - } - - /** - * constructor initializes with topics - * - * @param topic - * @return - * - */ - public Builder onTopic(String topic) { - fTopic = topic; - return this; - } - - /** - * constructor initilazes with batch size and batch time - * - * @param maxBatchSize - * @param maxBatchAgeMs - * @return - * - */ - public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - return this; - } - - /** - * constructor initializes with compress - * - * @param compress - * @return - */ - public Builder compress(boolean compress) { - fCompress = compress; - return this; - } - - /** - * method returns DMaaPCambriaSimplerBatchPublisher object - * - * @return - */ - public DMaaPCambriaSimplerBatchPublisher build() { - - try { - return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress); - } catch (MalformedURLException e) { - - NotYetConnectedException exception=new NotYetConnectedException(); - exception.setStackTrace(e.getStackTrace()); - - throw exception ; - - } - } - - private Collection fUrls; - private String fTopic; - private int fMaxBatchSize = 100; - private long fMaxBatchAgeMs = 1000; - private boolean fCompress = false; - }; - - /** - * - * @param partition - * @param msg - */ - @Override - public int send(String partition, String msg) { - return send(new message(partition, msg)); - } - - /** - * @param msg - */ - @Override - public int send(message msg) { - final LinkedList list = new LinkedList(); - list.add(msg); - return send(list); - } - - /** - * @param msgs - */ - @Override - public synchronized int send(Collection msgs) { - if (fClosed) { - throw new IllegalStateException("The publisher was closed."); - } - - for (message userMsg : msgs) { - fPending.add(new TimestampedMessage(userMsg)); - } - return getPendingMessageCount(); - } - - /** - * getPending message count - */ - @Override - public synchronized int getPendingMessageCount() { - return fPending.size(); - } - - /** - * - * @exception InterruptedException - * @exception IOException - */ - @Override - public void close() { - try { - final List remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (remains.isEmpty()) { - getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " - + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); - } - } catch (InterruptedException e) { - getLog().warn("Possible message loss. " + e.getMessage(), e); - } catch (IOException e) { - getLog().warn("Possible message loss. " + e.getMessage(), e); - } - } - - /** - * @param time - * @param unit - */ - @Override - public List close(long time, TimeUnit unit) throws IOException, InterruptedException { - synchronized (this) { - fClosed = true; - - // stop the background sender - fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - fExec.shutdown(); - } - - final long now = Clock.now(); - final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); - final long timeoutAtMs = now + waitInMs; - - while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { - send(true); - Thread.sleep(250); - } - // synchronizing the current object - synchronized (this) { - final LinkedList result = new LinkedList(); - fPending.drainTo(result); - return result; - } - } - - /** - * Possibly send a batch to the cambria server. This is called by the - * background thread and the close() method - * - * @param force - */ - private synchronized void send(boolean force) { - if (force || shouldSendNow()) { - if (!sendBatch()) { - getLog().warn("Send failed, " + fPending.size() + " message to send."); - - // note the time for back-off - fDontSendUntilMs = sfWaitAfterError + Clock.now(); - } - } - } - - /** - * - * @return - */ - private synchronized boolean shouldSendNow() { - boolean shouldSend = false; - if (fPending.isEmpty()) { - final long nowMs = Clock.now(); - - shouldSend = (fPending.size() >= fMaxBatchSize); - if (!shouldSend) { - final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; - shouldSend = sendAtMs <= nowMs; - } - - // however, wait after an error - shouldSend = shouldSend && nowMs >= fDontSendUntilMs; - } - return shouldSend; - } - - /** - * - * @return - */ - private synchronized boolean sendBatch() { - // it's possible for this call to be made with an empty list. in this - // case, just return. - if (fPending.isEmpty()) { - return true; - } - - final long nowMs = Clock.now(); - final String url = CambriaPublisherUtility.makeUrl(fTopic); - - getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); - - try { - - final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); - OutputStream os = baseStream; - if (fCompress) { - os = new GZIPOutputStream(baseStream); - } - for (TimestampedMessage m : fPending) { - os.write(("" + m.fPartition.length()).getBytes()); - os.write('.'); - os.write(("" + m.fMsg.length()).getBytes()); - os.write('.'); - os.write(m.fPartition.getBytes()); - os.write(m.fMsg.getBytes()); - os.write('\n'); - } - os.close(); - - final long startMs = Clock.now(); - - // code from REST Client Starts - - - - - Client client = ClientBuilder.newClient(); - String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); - if (null==metricTopicname) { - - metricTopicname="msgrtr.apinode.metrics.dmaap"; - } - WebTarget target = client - .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort); - target = target.path("/events/" + fTopic); - getLog().info("url : " + target.getUri().toString()); - // API Key - - Entity data = Entity.entity(baseStream.toByteArray(), "application/cambria"); - - Response response = target.request().post(data); - - getLog().info("Response received :: " + response.getStatus()); - getLog().info("Response received :: " + response.toString()); - - // code from REST Client Ends - - - fPending.clear(); - return true; - } catch (IllegalArgumentException x) { - getLog().warn(x.getMessage(), x); - } - - catch (IOException x) { - getLog().warn(x.getMessage(), x); - } - return false; - } - - private final String fTopic; - private final int fMaxBatchSize; - private final long fMaxBatchAgeMs; - private final boolean fCompress; - private boolean fClosed; - - private final LinkedBlockingQueue fPending; - private long fDontSendUntilMs; - private final ScheduledThreadPoolExecutor fExec; - - private static final long sfWaitAfterError = 1000; - - /** - * - * @param hosts - * @param topic - * @param maxBatchSize - * @param maxBatchAgeMs - * @param compress - */ - private DMaaPCambriaSimplerBatchPublisher(Collection hosts, String topic, int maxBatchSize, - long maxBatchAgeMs, boolean compress) throws MalformedURLException { - - super(hosts); - - if (topic == null || topic.length() < 1) { - throw new IllegalArgumentException("A topic must be provided."); - } - - fClosed = false; - fTopic = topic; - fMaxBatchSize = maxBatchSize; - fMaxBatchAgeMs = maxBatchAgeMs; - fCompress = compress; - - fPending = new LinkedBlockingQueue(); - fDontSendUntilMs = 0; - - fExec = new ScheduledThreadPoolExecutor(1); - fExec.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - send(false); - } - }, 100, 50, TimeUnit.MILLISECONDS); - } - - /** - * - * - * @author anowarul.islam - * - */ - private static class TimestampedMessage extends message { - /** - * to store timestamp value - */ - public final long timestamp; - - /** - * constructor initialize with message - * - * @param m - * - */ - public TimestampedMessage(message m) { - super(m); - timestamp = Clock.now(); - } - } - -} -- cgit 1.2.3-korg