diff options
author | sunil unnava <su622b@att.com> | 2018-08-14 09:34:46 -0400 |
---|---|---|
committer | sunil unnava <su622b@att.com> | 2018-08-14 09:39:23 -0400 |
commit | b32effcaf5684d5e2f338a4537b71a2375c534e5 (patch) | |
tree | e1b80407f414509ffcc766b987ec6a95f7254b4e /src/main/java/com/att/dmf/mr/metrics | |
parent | 0823cb186012c8e6b7de3d979dfabb9f838da7c2 (diff) |
update the testcases after the kafka 11 changes
Issue-ID: DMAAP-526
Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93
Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/metrics')
10 files changed, 1637 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java new file mode 100644 index 0000000..45644b7 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaBatchingPublisher.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A Cambria batching publisher is a publisher with additional functionality + * for managing delayed sends. + * + * @author peter + * + */ +public interface CambriaBatchingPublisher extends CambriaPublisher +{ + /** + * Get the number of messages that have not yet been sent. + * @return the number of pending messages + */ + int getPendingMessageCount (); + + /** + * Close this publisher, sending any remaining messages. + * @param timeout an amount of time to wait for unsent messages to be sent + * @param timeoutUnits the time unit for the timeout arg + * @return a list of any unsent messages after the timeout + * @throws IOException + * @throws InterruptedException + */ + List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException; +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java new file mode 100644 index 0000000..0993aa6 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +//import org.slf4j.Logger; + +// +import com.att.eelf.configuration.EELFLogger; +//import com.att.eelf.configuration.EELFManager; + +/** + * + * @author anowarul.islam + * + */ +public interface CambriaClient { + /** + * An exception at the Cambria layer. This is used when the HTTP transport + * layer returns a success code but the transaction is not completed as + * expected. + */ + public class CambriaApiException extends Exception { + /** + * + * @param msg + */ + public CambriaApiException(String msg) { + super(msg); + } + + /** + * + * @param msg + * @param t + */ + public CambriaApiException(String msg, Throwable t) { + super(msg, t); + } + + private static final long serialVersionUID = 1L; + } + + /** + * Optionally set the Logger to use + * + * @param log + */ + void logTo(EELFLogger log); + + /** + * Set the API credentials for this client connection. Subsequent calls will + * include authentication headers.who i + * + * @param apiKey + * @param apiSecret + */ + void setApiCredentials(String apiKey, String apiSecret); + + /** + * Remove API credentials, if any, on this connection. Subsequent calls will + * not include authentication headers. + */ + void clearApiCredentials(); + + /** + * Close this connection. Some client interfaces have additional close + * capability. + */ + void close(); +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java new file mode 100644 index 0000000..4a6ca81 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaConsumer.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +import java.io.IOException; + +/** + * This interface will provide fetch mechanism for consumer + * @author nilanjana.maity + * + */ +public interface CambriaConsumer extends CambriaClient +{ + /** + * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call. + + * @return a set of messages + * @throws IOException + */ + Iterable<String> fetch () throws IOException; + + /** + * Fetch a set of messages with an explicit timeout and limit for this call. These values + * override any set in the constructor call. + * + * @param timeoutMs The amount of time in milliseconds that the server should keep the connection + * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side). + * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit. + * @return a set messages + * @throws IOException if there's a problem connecting to the server + */ + Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException; +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java new file mode 100644 index 0000000..4020a6d --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisher.java @@ -0,0 +1,101 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +import java.io.IOException; +import java.util.Collection; + +/** + * A Cambria publishing interface. + * + * @author peter + * + */ +public interface CambriaPublisher extends CambriaClient { + /** + * A simple message container + */ + public static class message { + /** + * + * @param partition + * @param msg + */ + public message(String partition, String msg) { + fPartition = partition == null ? "" : partition; + fMsg = msg; + if (fMsg == null) { + throw new IllegalArgumentException("Can't send a null message."); + } + } + + /** + * + * @param msg + */ + public message(message msg) { + this(msg.fPartition, msg.fMsg); + } + + /** + * declaring partition string + */ + public final String fPartition; + /** + * declaring fMsg String + */ + public final String fMsg; + } + + /** + * Send the given message using the given partition. + * + * @param partition + * @param msg + * @return the number of pending messages + * @throws IOException + */ + int send(String partition, String msg) throws IOException; + + /** + * Send the given message using its partition. + * + * @param msg + * @return the number of pending messages + * @throws IOException + */ + int send(message msg) throws IOException; + + /** + * Send the given messages using their partitions. + * + * @param msgs + * @return the number of pending messages + * @throws IOException + */ + int send(Collection<message> msgs) throws IOException; + + /** + * Close this publisher. It's an error to call send() after close() + */ + void close(); +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java new file mode 100644 index 0000000..1510c32 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java @@ -0,0 +1,146 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.http.HttpHost; +/** + * + * @author anowarul.islam + * + */ +public class CambriaPublisherUtility +{ + public static final String kBasePath = "/events/"; + public static final int kStdCambriaServicePort = 3904; +/** + * + * Translates a string into <code>application/x-www-form-urlencoded</code> + * format using a specific encoding scheme. + * @param s + * @return + * + */ + public static String escape ( String s ) + { + try + { + return URLEncoder.encode ( s, "UTF-8"); + } + catch ( UnsupportedEncodingException e ) + { + throw new RuntimeException ( e ); + } + } +/** + * + * building url + * @param rawTopic + * @return + */ + public static String makeUrl ( String rawTopic ) + { + final String cleanTopic = escape ( rawTopic ); + + final StringBuffer url = new StringBuffer(). + append ( CambriaPublisherUtility.kBasePath ). + append ( cleanTopic ); + return url.toString (); + } +/** + * + * building consumerUrl + * @param topic + * @param rawConsumerGroup + * @param rawConsumerId + * @return + */ + public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId ) + { + final String cleanConsumerGroup = escape ( rawConsumerGroup ); + final String cleanConsumerId = escape ( rawConsumerId ); + return CambriaPublisherUtility.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId; + } + + /** + * Create a list of HttpHosts from an input list of strings. Input strings have + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param hosts + * @return a list of hosts + */ + public static List<HttpHost> createHostsList(Collection<String> hosts) + { + final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); + for ( String host : hosts ) + { + if ( host.length () == 0 ) continue; + convertedHosts.add ( hostForString ( host ) ); + } + return convertedHosts; + } + + /** + * Return an HttpHost from an input string. Input string has + * host[:port] as format. If the port section is not provided, the default port is used. + * + * @param hosts + * @return a list of hosts + * if host.length<1 throws IllegalArgumentException + * + */ + public static HttpHost hostForString ( String host ) + { + if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." ); + + String hostPart = host; + int port = kStdCambriaServicePort; + + final int colon = host.indexOf ( ':' ); + if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." ); + if ( colon > 0 ) + { + hostPart = host.substring ( 0, colon ).trim(); + + final String portPart = host.substring ( colon + 1 ).trim(); + if ( portPart.length () > 0 ) + { + try + { + port = Integer.parseInt ( portPart ); + } + catch ( NumberFormatException x ) + { + throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x ); + } + } + // else: use default port on "foo:" + } + + return new HttpHost ( hostPart, port ); + } +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java new file mode 100644 index 0000000..d02438f --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java @@ -0,0 +1,425 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher; + +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.TreeSet; +import java.util.UUID; + +import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl; +import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher; + +/** + * A factory for Cambria clients.<br/> + * <br/> + * Use caution selecting a consumer creator factory. If the call doesn't accept + * a consumer group name, then it creates a consumer that is not restartable. + * That is, if you stop your process and start it again, your client will NOT + * receive any missed messages on the topic. If you need to ensure receipt of + * missed messages, then you must use a consumer that's created with a group + * name and ID. (If you create multiple consumer processes using the same group, + * load is split across them. Be sure to use a different ID for each instance.)<br/> + * <br/> + * Publishers + * + * @author peter + */ +public class DMaaPCambriaClientFactory { + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostList + * A comma separated list of hosts to use to connect to Cambria. + * You can include port numbers (3904 is the default). For + * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(String hostList, String topic) { + return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList), + topic); + } + + /** + * Create a consumer instance with the default timeout and no limit on + * messages returned. This consumer operates as an independent consumer + * (i.e., not in a group) and is NOT re-startable across sessions. + * + * @param hostSet + * The host used in the URL to Cambria. Entries can be + * "host:port". + * @param topic + * The topic to consume + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(Collection<String> hostSet, + String topic) { + return createConsumer(hostSet, topic, null); + } + + /** + * Create a consumer instance with server-side filtering, the default + * timeout, and no limit on messages returned. This consumer operates as an + * independent consumer (i.e., not in a group) and is NOT re-startable + * across sessions. + * + * @param hostSet + * The host used in the URL to Cambria. Entries can be + * "host:port". + * @param topic + * The topic to consume + * @param filter + * a filter to use on the server side + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(Collection<String> hostSet, + String topic, String filter) { + return createConsumer(hostSet, topic, UUID.randomUUID().toString(), + "0", -1, -1, filter, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to Cambria. Entries can be + * "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(Collection<String> hostSet, + final String topic, final String consumerGroup, + final String consumerId) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. + * + * @param hostSet + * The host used in the URL to Cambria. Entries can be + * "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(Collection<String> hostSet, + final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit) { + return createConsumer(hostSet, topic, consumerGroup, consumerId, + timeoutMs, limit, null, null, null); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostList + * A comma separated list of hosts to use to connect to Cambria. + * You can include port numbers (3904 is the default). For + * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * @param apiKey + * key associated with a user + * @param apiSecret + * of a user + * + * @return a consumer + */ + public static CambriaConsumer createConsumer(String hostList, + final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, + String apiKey, String apiSecret) { + return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList), + topic, consumerGroup, consumerId, timeoutMs, limit, filter, + apiKey, apiSecret); + } + + /** + * Create a consumer instance with the default timeout, and no limit on + * messages returned. This consumer can operate in a logical group and is + * re-startable across sessions when you use the same group and ID on + * restart. This consumer also uses server-side filtering. + * + * @param hostSet + * The host used in the URL to Cambria. Entries can be + * "host:port". + * @param topic + * The topic to consume + * @param consumerGroup + * The name of the consumer group this consumer is part of + * @param consumerId + * The unique id of this consume in its group + * @param timeoutMs + * The amount of time in milliseconds that the server should keep + * the connection open while waiting for message traffic. Use -1 + * for default timeout. + * @param limit + * A limit on the number of messages returned in a single call. + * Use -1 for no limit. + * @param filter + * A Highland Park filter expression using only built-in filter + * components. Use null for "no filter". + * @param apiKey + * key associated with a user + * @param apiSecret + * of a user + * @return a consumer + */ + public static CambriaConsumer createConsumer(Collection<String> hostSet, + final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, + String apiKey, String apiSecret) { + if (sfMock != null) + return sfMock; + try { + return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup, + consumerId, timeoutMs, limit, filter, apiKey, apiSecret); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + /*************************************************************************/ + /*************************************************************************/ + /*************************************************************************/ + + /** + * Create a publisher that sends each message (or group of messages) + * immediately. Most applications should favor higher latency for much + * higher message throughput and the "simple publisher" is not a good + * choice. + * + * @param hostlist + * The host used in the URL to Cambria. Can be "host:port", can + * be multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @return a publisher + */ + public static CambriaBatchingPublisher createSimplePublisher( + String hostlist, String topic) { + return createBatchingPublisher(hostlist, topic, 1, 1); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. Message payloads are + * not compressed. + * + * @param hostlist + * The host used in the URL to Cambria. Can be "host:port", can + * be multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * + * @return a publisher + */ + public static CambriaBatchingPublisher createBatchingPublisher( + String hostlist, String topic, int maxBatchSize, long maxAgeMs) { + return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, + false); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostlist + * The host used in the URL to Cambria. Can be "host:port", can + * be multiple comma-separated entries. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static CambriaBatchingPublisher createBatchingPublisher( + String hostlist, String topic, int maxBatchSize, long maxAgeMs, + boolean compress) { + return createBatchingPublisher( + DMaaPCambriaConsumerImpl.stringToList(hostlist), topic, + maxBatchSize, maxAgeMs, compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to Cambria. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static CambriaBatchingPublisher createBatchingPublisher( + String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, + boolean compress) { + final TreeSet<String> hosts = new TreeSet<String>(); + for (String hp : hostSet) { + hosts.add(hp); + } + return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, + compress); + } + + /** + * Create a publisher that batches messages. Be sure to close the publisher + * to send the last batch and ensure a clean shutdown. + * + * @param hostSet + * A set of hosts to be used in the URL to Cambria. Can be + * "host:port". Use multiple entries to enable failover. + * @param topic + * The topic on which to publish messages. + * @param maxBatchSize + * The largest set of messages to batch + * @param maxAgeMs + * The maximum age of a message waiting in a batch + * @param compress + * use gzip compression + * + * @return a publisher + */ + public static CambriaBatchingPublisher createBatchingPublisher( + Collection<String> hostSet, String topic, int maxBatchSize, + long maxAgeMs, boolean compress) { + return new DMaaPCambriaSimplerBatchPublisher.Builder() + .againstUrls(hostSet).onTopic(topic) + .batchTo(maxBatchSize, maxAgeMs).compress(compress).build(); + } + + /** + * Create an identity manager client to work with API keys. + * + * @param hostSet + * A set of hosts to be used in the URL to Cambria. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return an identity manager + */ + /* + * public static CambriaIdentityManager createIdentityManager ( + * Collection<String> hostSet, String apiKey, String apiSecret ) { final + * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet ); + * cim.setApiCredentials ( apiKey, apiSecret ); return cim; } + */ + + /** + * Create a topic manager for working with topics. + * + * @param hostSet + * A set of hosts to be used in the URL to Cambria. Can be + * "host:port". Use multiple entries to enable failover. + * @param apiKey + * Your API key + * @param apiSecret + * Your API secret + * @return a topic manager + */ + /* + * public static CambriaTopicManager createTopicManager ( Collection<String> + * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi + * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey, + * apiSecret ); return tmi; } + */ + + /** + * Inject a consumer. Used to support unit tests. + * + * @param cc + */ + public static void $testInject(CambriaConsumer cc) { + sfMock = cc; + } + + private static CambriaConsumer sfMock = null; +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java new file mode 100644 index 0000000..08b2fd1 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher.impl; + +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.json.JSONArray; +import org.json.JSONException; + +import com.att.dmf.mr.constants.CambriaConstants; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.apiClient.http.CacheUse; +import com.att.nsa.apiClient.http.HttpClient; + +/** + * + * @author anowarul.islam + * + */ +public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metrics.publisher.CambriaClient +{ + protected CambriaBaseClient ( Collection<String> hosts ) throws MalformedURLException + { + this ( hosts, null ); + } + + public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException + { + /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, + CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/ + + super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); + + //fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); + //( this.getClass().getName () ); + } + + @Override + public void close () + { + } + + public Set<String> jsonArrayToSet ( JSONArray a ) throws JSONException + { + if ( a == null ) return null; + + final TreeSet<String> set = new TreeSet<String> (); + for ( int i=0; i<a.length (); i++ ) + { + set.add ( a.getString ( i )); + } + return set; + } + /** + * @param log + */ + public void logTo ( EELFLogger log ) + { + fLog = log; + + //replaceLogger ( log ); + } + + public EELFLogger getLog () + { + return fLog; + } + + private EELFLogger fLog; + + + +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java new file mode 100644 index 0000000..7463700 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/Clock.java @@ -0,0 +1,74 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher.impl; + +/** + * + * This class maintains the system clocks + * @author nilanjana.maity + * + */ +public class Clock +{ + public synchronized static Clock getIt () + { + if ( sfClock == null ) + { + sfClock = new Clock (); + } + return sfClock; + } + + /** + * + * Get the system's current time in milliseconds. + * @return the current time + * + */ + public static long now () + { + return getIt().nowImpl (); + } + + /** + * Get current time in milliseconds + * @return current time in ms + */ + public long nowImpl () + { + return System.currentTimeMillis (); + } + + /** + * Initialize constructor + */ + public Clock () + { + } + + private static Clock sfClock = null; + + public synchronized static void register ( Clock testClock ) + { + sfClock = testClock; + } +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java new file mode 100644 index 0000000..ee56213 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaConsumerImpl.java @@ -0,0 +1,169 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher.impl; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URLEncoder; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.apiClient.http.HttpObjectNotFoundException; + +import jline.internal.Log; + +/** + * + * @author anowarul.islam + * + */ +public class DMaaPCambriaConsumerImpl extends CambriaBaseClient + implements com.att.dmf.mr.metrics.publisher.CambriaConsumer { + private final String fTopic; + private final String fGroup; + private final String fId; + private final int fTimeoutMs; + private final int fLimit; + private final String fFilter; + + /** + * + * @param hostPart + * @param topic + * @param consumerGroup + * @param consumerId + * @param timeoutMs + * @param limit + * @param filter + * @param apiKey + * @param apiSecret + */ + public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup, + final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException { + super(hostPart, topic + "::" + consumerGroup + "::" + consumerId); + + fTopic = topic; + fGroup = consumerGroup; + fId = consumerId; + fTimeoutMs = timeoutMs; + fLimit = limit; + fFilter = filter; + + setApiCredentials(apiKey, apiSecret); + } + + /** + * method converts String to list + * + * @param str + * @return + */ + public static List<String> stringToList(String str) { + final LinkedList<String> set = new LinkedList<String>(); + if (str != null) { + final String[] parts = str.trim().split(","); + for (String part : parts) { + final String trimmed = part.trim(); + if (trimmed.length() > 0) { + set.add(trimmed); + } + } + } + return set; + } + + @Override + public Iterable<String> fetch() throws IOException { + // fetch with the timeout and limit set in constructor + return fetch(fTimeoutMs, fLimit); + } + + @Override + public Iterable<String> fetch(int timeoutMs, int limit) throws IOException { + final LinkedList<String> msgs = new LinkedList<String>(); + + final String urlPath = createUrlPath(timeoutMs, limit); + + getLog().info("UEB GET " + urlPath); + try { + final JSONObject o = get(urlPath); + + if (o != null) { + final JSONArray a = o.getJSONArray("result"); + if (a != null) { + for (int i = 0; i < a.length(); i++) { + msgs.add(a.getString(i)); + } + } + } + } catch (HttpObjectNotFoundException e) { + // this can happen if the topic is not yet created. ignore. + Log.error("Failed due to topic is not yet created" + e); + } catch (JSONException e) { + // unexpected response + reportProblemWithResponse(); + Log.error("Failed due to jsonException", e); + } catch (HttpException e) { + throw new IOException(e); + } + + return msgs; + } + + public String createUrlPath(int timeoutMs, int limit) { + final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId)); + final StringBuilder adds = new StringBuilder(); + if (timeoutMs > -1) { + adds.append("timeout=").append(timeoutMs); + } + + if (limit > -1) { + if (adds.length() > 0) { + adds.append("&"); + } + adds.append("limit=").append(limit); + } + if (fFilter != null && fFilter.length() > 0) { + try { + if (adds.length() > 0) { + adds.append("&"); + } + adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8")); + } catch (UnsupportedEncodingException e) { + Log.error("Failed due to UnsupportedEncodingException" + e); + } + } + if (adds.length() > 0) { + url.append("?").append(adds.toString()); + } + return url.toString(); + } + +} diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java new file mode 100644 index 0000000..d8d8799 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java @@ -0,0 +1,429 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.metrics.publisher.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility; + +/** + * + * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages + * in batch + * + * @author anowarul.islam + * + */ +public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient + implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher { + /** + * + * static inner class initializes with urls, topic,batchSize + * + * @author anowarul.islam + * + */ + public static class Builder { + public Builder() { + } + + /** + * constructor initialize with url + * + * @param baseUrls + * @return + * + */ + public Builder againstUrls(Collection<String> baseUrls) { + fUrls = baseUrls; + return this; + } + + /** + * constructor initializes with topics + * + * @param topic + * @return + * + */ + public Builder onTopic(String topic) { + fTopic = topic; + return this; + } + + /** + * constructor initilazes with batch size and batch time + * + * @param maxBatchSize + * @param maxBatchAgeMs + * @return + * + */ + public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) { + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + return this; + } + + /** + * constructor initializes with compress + * + * @param compress + * @return + */ + public Builder compress(boolean compress) { + fCompress = compress; + return this; + } + + /** + * method returns DMaaPCambriaSimplerBatchPublisher object + * + * @return + */ + public DMaaPCambriaSimplerBatchPublisher build() { + + try { + return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + private Collection<String> fUrls; + private String fTopic; + private int fMaxBatchSize = 100; + private long fMaxBatchAgeMs = 1000; + private boolean fCompress = false; + }; + + /** + * + * @param partition + * @param msg + */ + @Override + public int send(String partition, String msg) { + return send(new message(partition, msg)); + } + + /** + * @param msg + */ + @Override + public int send(message msg) { + final LinkedList<message> list = new LinkedList<message>(); + list.add(msg); + return send(list); + } + + /** + * @param msgs + */ + @Override + public synchronized int send(Collection<message> msgs) { + if (fClosed) { + throw new IllegalStateException("The publisher was closed."); + } + + for (message userMsg : msgs) { + fPending.add(new TimestampedMessage(userMsg)); + } + return getPendingMessageCount(); + } + + /** + * getPending message count + */ + @Override + public synchronized int getPendingMessageCount() { + return fPending.size(); + } + + /** + * + * @exception InterruptedException + * @exception IOException + */ + @Override + public void close() { + try { + final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (remains.size() > 0) { + getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); + } + } catch (InterruptedException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } catch (IOException e) { + getLog().warn("Possible message loss. " + e.getMessage(), e); + } + } + + /** + * @param time + * @param unit + */ + @Override + public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException { + synchronized (this) { + fClosed = true; + + // stop the background sender + fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + fExec.shutdown(); + } + + final long now = Clock.now(); + final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit); + final long timeoutAtMs = now + waitInMs; + + while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) { + send(true); + Thread.sleep(250); + } + // synchronizing the current object + synchronized (this) { + final LinkedList<message> result = new LinkedList<message>(); + fPending.drainTo(result); + return result; + } + } + + /** + * Possibly send a batch to the cambria server. This is called by the + * background thread and the close() method + * + * @param force + */ + private synchronized void send(boolean force) { + if (force || shouldSendNow()) { + if (!sendBatch()) { + getLog().warn("Send failed, " + fPending.size() + " message to send."); + + // note the time for back-off + fDontSendUntilMs = sfWaitAfterError + Clock.now(); + } + } + } + + /** + * + * @return + */ + private synchronized boolean shouldSendNow() { + boolean shouldSend = false; + if (fPending.size() > 0) { + final long nowMs = Clock.now(); + + shouldSend = (fPending.size() >= fMaxBatchSize); + if (!shouldSend) { + final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; + shouldSend = sendAtMs <= nowMs; + } + + // however, wait after an error + shouldSend = shouldSend && nowMs >= fDontSendUntilMs; + } + return shouldSend; + } + + /** + * + * @return + */ + private synchronized boolean sendBatch() { + // it's possible for this call to be made with an empty list. in this + // case, just return. + if (fPending.size() < 1) { + return true; + } + + final long nowMs = Clock.now(); + final String url = CambriaPublisherUtility.makeUrl(fTopic); + + getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: " + + (nowMs - fPending.peek().timestamp) + " ms"); + + try { + + final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); + OutputStream os = baseStream; + if (fCompress) { + os = new GZIPOutputStream(baseStream); + } + for (TimestampedMessage m : fPending) { + os.write(("" + m.fPartition.length()).getBytes()); + os.write('.'); + os.write(("" + m.fMsg.length()).getBytes()); + os.write('.'); + os.write(m.fPartition.getBytes()); + os.write(m.fMsg.getBytes()); + os.write('\n'); + } + os.close(); + + final long startMs = Clock.now(); + + // code from REST Client Starts + + // final String serverCalculatedSignature = sha1HmacSigner.sign + // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV"); + + Client client = ClientBuilder.newClient(); + String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); + if (null==metricTopicname) { + + metricTopicname="msgrtr.apinode.metrics.dmaap"; + } + WebTarget target = client + .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort); + target = target.path("/events/" + fTopic); + getLog().info("url : " + target.getUri().toString()); + // API Key + + Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria"); + + Response response = target.request().post(data); + // header("X-CambriaAuth", + // "2OH46YIWa329QpEF:"+serverCalculatedSignature). + // header("X-CambriaDate", "2015-09-21T11:38:19-0700"). + // post(Entity.json(baseStream.toByteArray())); + + getLog().info("Response received :: " + response.getStatus()); + getLog().info("Response received :: " + response.toString()); + + // code from REST Client Ends + + /* + * final JSONObject result = post ( url, contentType, + * baseStream.toByteArray(), true ); final String logLine = + * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" + + * result.toString (); getLog().info ( logLine ); + */ + fPending.clear(); + return true; + } catch (IllegalArgumentException x) { + getLog().warn(x.getMessage(), x); + } + /* + * catch ( HttpObjectNotFoundException x ) { getLog().warn ( + * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn ( + * x.getMessage(), x ); } + */ + catch (IOException x) { + getLog().warn(x.getMessage(), x); + } + return false; + } + + private final String fTopic; + private final int fMaxBatchSize; + private final long fMaxBatchAgeMs; + private final boolean fCompress; + private boolean fClosed; + + private final LinkedBlockingQueue<TimestampedMessage> fPending; + private long fDontSendUntilMs; + private final ScheduledThreadPoolExecutor fExec; + + private static final long sfWaitAfterError = 1000; + + /** + * + * @param hosts + * @param topic + * @param maxBatchSize + * @param maxBatchAgeMs + * @param compress + */ + private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, + long maxBatchAgeMs, boolean compress) throws MalformedURLException { + + super(hosts); + + if (topic == null || topic.length() < 1) { + throw new IllegalArgumentException("A topic must be provided."); + } + + fClosed = false; + fTopic = topic; + fMaxBatchSize = maxBatchSize; + fMaxBatchAgeMs = maxBatchAgeMs; + fCompress = compress; + + fPending = new LinkedBlockingQueue<TimestampedMessage>(); + fDontSendUntilMs = 0; + + fExec = new ScheduledThreadPoolExecutor(1); + fExec.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + send(false); + } + }, 100, 50, TimeUnit.MILLISECONDS); + } + + /** + * + * + * @author anowarul.islam + * + */ + private static class TimestampedMessage extends message { + /** + * to store timestamp value + */ + public final long timestamp; + + /** + * constructor initialize with message + * + * @param m + * + */ + public TimestampedMessage(message m) { + super(m); + timestamp = Clock.now(); + } + } + +} |