summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/cambria/beans
diff options
context:
space:
mode:
authorVarun Gudisena <vg411h@att.com>2017-08-31 10:52:33 -0500
committerVarun Gudisena <vg411h@att.com>2017-08-31 10:52:50 -0500
commit3fc19dc9157f4d05bdbd6fd05a52f0592268c4e7 (patch)
tree69355ec5a2a03a1867862e6b757b51c45763ef1a /src/main/java/com/att/nsa/cambria/beans
parentca63da6e0cb7fb63e231343d0b52a40036f6b6aa (diff)
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: Ic741b602ade60f108d940c0571a1d94b7be2abc2 Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/beans')
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java88
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java227
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java104
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java319
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java462
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java232
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java139
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java45
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java52
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/LogDetails.java214
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/TopicBean.java155
11 files changed, 2037 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java
new file mode 100644
index 0000000..df4a2ed
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
+/**
+ *
+ * @author author
+ *
+ */
+@XmlRootElement
+public class ApiKeyBean implements Serializable {
+
+ private static final long serialVersionUID = -8219849086890567740L;
+
+ private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ private String email;
+ private String description;
+ /**
+ * constructor
+ */
+ public ApiKeyBean() {
+ super();
+ }
+/**
+ *
+ * @param email
+ * @param description
+ */
+ public ApiKeyBean(String email, String description) {
+ super();
+ this.email = email;
+ this.description = description;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getKey() {
+ return generateKey(16);
+ }
+
+ public String getSharedSecret() {
+ return generateKey(24);
+ }
+
+ private static String generateKey ( int length ) {
+ return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length );
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java
new file mode 100644
index 0000000..1b609b0
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java
@@ -0,0 +1,227 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.metrics.impl.CdmRateTicker;
+
+/**
+ * class provide rate information
+ *
+ * @author author
+ *
+ */
+@Component
+public class DMaaPCambriaLimiter {
+ /**
+ * constructor initializes
+ *
+ * @param settings
+ * @throws missingReqdSetting
+ * @throws invalidSettingValue
+ */
+ @Autowired
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
+ throws missingReqdSetting, invalidSettingValue {
+ fRateInfo = new HashMap<String, RateInfo>();
+ fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+ CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
+ fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
+ CambriaConstants.kDefault_RateLimitWindowLength);
+ fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+ CambriaConstants.kDefault_SleepMsOnRateLimit);
+ }
+
+ /**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
+ * Construct a rate limiter.
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
+ this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
+ }
+
+ /**
+ * Construct a rate limiter
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param sleepMs
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
+ fRateInfo = new HashMap<String, RateInfo>();
+ fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
+ fWindowLengthMins = windowLengthMins;
+ fSleepMs = Math.max(0, sleepMs);
+ }
+
+ /**
+ * Tell the rate limiter about a call to a topic/group/id. If the rate is
+ * too high, this call delays its return and throws an exception.
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @throws CambriaApiException
+ */
+ public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
+ // do nothing if rate is configured 0 or less
+ if (fMaxEmptyPollsPerMinute <= 0) {
+ return;
+ }
+
+ // setup rate info for this tuple
+ final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
+
+ final double rate = ri.onCall();
+ log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
+
+ if (rate > fMaxEmptyPollsPerMinute) {
+ try {
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+ + ".");
+ if (fSleepMs > 0) {
+ log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ + " ms sleep, then responding in error.");
+ Thread.sleep(fSleepMs);
+ } else {
+ log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "This client is making too many requests. Please use a long poll "
+ + "setting to decrease the number of requests that result in empty responses. ");
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @param sentCount
+ */
+ public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
+ // check for good replies
+ if (sentCount > 0) {
+ // that was a good send, reset the metric
+ getRateInfo(topic, consumerGroup, clientId).reset();
+ }
+ }
+
+ private static class RateInfo {
+ /**
+ * constructor initialzes
+ *
+ * @param label
+ * @param windowLengthMinutes
+ */
+ public RateInfo(String label, int windowLengthMinutes) {
+ fLabel = label;
+ fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+ windowLengthMinutes, TimeUnit.MINUTES);
+ }
+
+ public String getLabel() {
+ return fLabel;
+ }
+
+ /**
+ * CdmRateTicker is reset
+ */
+ public void reset() {
+ fCallRateSinceLastMsgSend.reset();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public double onCall() {
+ fCallRateSinceLastMsgSend.tick();
+ return fCallRateSinceLastMsgSend.getRate();
+ }
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
+ }
+
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final double fMaxEmptyPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+ private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
+ final String key = makeKey(topic, consumerGroup, clientId);
+ RateInfo ri = fRateInfo.get(key);
+ if (ri == null) {
+ ri = new RateInfo(key, fWindowLengthMins);
+ fRateInfo.put(key, ri);
+ }
+ return ri;
+ }
+
+ private String makeKey(String topic, String group, String id) {
+ return topic + "::" + group + "::" + id;
+ }
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java
new file mode 100644
index 0000000..79a8e1f
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java
@@ -0,0 +1,104 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import com.att.nsa.cambria.utils.ConfigurationReader;
+
+/**
+ * DMaaPContext provide and maintain all the configurations , Http request/response
+ * Session and consumer Request Time
+ * @author author
+ *
+ */
+public class DMaaPContext {
+
+ private ConfigurationReader configReader;
+ private HttpServletRequest request;
+ private HttpServletResponse response;
+ private HttpSession session;
+ private String consumerRequestTime;
+ static int i=0;
+
+ public synchronized static long getBatchID() {
+ try{
+ final long metricsSendTime = System.currentTimeMillis();
+ final Date d = new Date(metricsSendTime);
+ final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d);
+ long dt= Long.valueOf(text)+i;
+ i++;
+ return dt;
+ }
+ catch(NumberFormatException ex){
+ return 0;
+ }
+ }
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(HttpServletRequest request) {
+ this.request = request;
+ }
+
+ public HttpServletResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(HttpServletResponse response) {
+ this.response = response;
+ }
+
+ public HttpSession getSession() {
+ this.session = request.getSession();
+ return session;
+ }
+
+ public void setSession(HttpSession session) {
+ this.session = session;
+ }
+
+ public ConfigurationReader getConfigReader() {
+ return configReader;
+ }
+
+ public void setConfigReader(ConfigurationReader configReader) {
+ this.configReader = configReader;
+ }
+
+ public String getConsumerRequestTime() {
+ return consumerRequestTime;
+ }
+
+ public void setConsumerRequestTime(String consumerRequestTime) {
+ this.consumerRequestTime = consumerRequestTime;
+ }
+
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java
new file mode 100644
index 0000000..28d48fa
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java
@@ -0,0 +1,319 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.nsa.cambria.backends.Consumer;
+import com.att.nsa.cambria.backends.ConsumerFactory;
+import com.att.nsa.cambria.backends.MetricsSet;
+import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
+import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
+import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import kafka.consumer.ConsumerConfig;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+/**
+ * @author author
+ *
+ */
+public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+
+ //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
+ /**
+ * constructor initialization
+ *
+ * @param settings
+ * @param metrics
+ * @param curator
+ * @throws missingReqdSetting
+ * @throws KafkaConsumerCacheException
+ * @throws UnknownHostException
+ */
+ public DMaaPKafkaConsumerFactory(
+ @Qualifier("propertyReader") rrNvReadable settings,
+ @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
+ @Qualifier("curator") CuratorFramework curator)
+ throws missingReqdSetting, KafkaConsumerCacheException,
+ UnknownHostException {
+ /*final String apiNodeId = settings.getString(
+ CambriaConstants.kSetting_ApiNodeIdentifier,
+ InetAddress.getLocalHost().getCanonicalHostName()
+ + ":"
+ + settings.getInt(CambriaConstants.kSetting_Port,
+ CambriaConstants.kDefault_Port));*/
+ String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ CambriaConstants.kSetting_ApiNodeIdentifier);
+ if (apiNodeId == null){
+
+ apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
+ + ":"
+ + settings.getInt(CambriaConstants.kSetting_Port,
+ CambriaConstants.kDefault_Port);
+ }
+
+ log.info("This Cambria API Node identifies itself as [" + apiNodeId
+ + "].");
+ final String mode = CambriaConstants.DMAAP;
+ /*fSettings = settings;
+ fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
+ .getString(CambriaConstants.kSetting_ZkConfigDbServers,
+ CambriaConstants.kDefault_ZkConfigDbServers));*/
+
+ String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
+ if(null==strkSettings_KafkaZookeeper){
+ strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
+ if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
+
+ }
+ fZooKeeper= strkSettings_KafkaZookeeper;
+
+ //final boolean isCacheEnabled = fSettings.getBoolean(
+ // kSetting_EnableCache, kDefault_IsCacheEnabled);
+ boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
+ String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
+ if(null!=strkSetting_EnableCache)kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
+
+ final boolean isCacheEnabled = kSetting_EnableCache;
+
+
+ fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
+ metrics) : null;
+ if (fCache != null) {
+ fCache.startCache(mode, curator);
+ }
+ }
+
+ @Override
+ public Consumer getConsumerFor(String topic, String consumerGroupName,
+ String consumerId, int timeoutMs) throws UnavailableException {
+ KafkaConsumer kc;
+
+ try {
+ kc = (fCache != null) ? fCache.getConsumerFor(topic,
+ consumerGroupName, consumerId) : null;
+ } catch (KafkaConsumerCacheException e) {
+ throw new UnavailableException(e);
+ }
+
+ if (kc == null) {
+
+ final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
+// final InterProcessMutex fLock = new InterProcessMutex(
+// ConfigurationReader.getCurator(), "/consumerFactory/"
+// + topic + "/" + consumerGroupName + "/"
+// + consumerId);
+ boolean locked = false;
+ try {
+
+ locked = ipLock.acquire(30, TimeUnit.SECONDS);
+ if (!locked) {
+ // FIXME: this seems to cause trouble in some cases. This exception
+ // gets thrown routinely. Possibly a consumer trying multiple servers
+ // at once, producing a never-ending cycle of overlapping locks?
+ // The problem is that it throws and winds up sending a 503 to the
+ // client, which would be incorrect if the client is causing trouble
+ // by switching back and forth.
+
+ throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
+ }
+
+// if (!fLock.acquire(30, TimeUnit.SECONDS)) {
+// throw new UnavailableException(
+// "Could not acquire lock in order to create (topic, group, consumer) = "
+// + "(" + topic + ", " + consumerGroupName
+// + ", " + consumerId + ")");
+// }
+
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+
+ log.info("Creating Kafka consumer for group ["
+ + consumerGroupName + "], consumer [" + consumerId
+ + "], on topic [" + topic + "].");
+
+ final String fakeGroupName = consumerGroupName + "--" + topic;
+
+ final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
+ consumerId);
+ final ConsumerConnector cc = kafka.consumer.Consumer
+ .createJavaConsumerConnector(ccc);
+ kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
+
+ if (fCache != null) {
+ fCache.putConsumerFor(topic, consumerGroupName, consumerId,
+ kc);
+ }
+ } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
+ log.warn("Kafka consumer couldn't connect to ZK.");
+ throw new UnavailableException("Couldn't connect to ZK.");
+ } catch (KafkaConsumerCacheException e) {
+ log.warn("Failed to cache consumer (this may have performance implications): "
+ + e.getMessage());
+ } catch (Exception e) {
+ throw new UnavailableException(
+ "Error while acquiring consumer factory lock", e);
+ } finally {
+ if ( locked )
+ {
+ try {
+ ipLock.release();
+ } catch (Exception e) {
+ throw new UnavailableException("Error while releasing consumer factory lock", e);
+ }
+ }
+ }
+ }
+
+ return kc;
+ }
+
+ @Override
+ public synchronized void destroyConsumer(String topic,
+ String consumerGroup, String clientId) {
+ if (fCache != null) {
+ fCache.dropConsumer(topic, consumerGroup, clientId);
+ }
+ }
+
+ @Override
+ public synchronized Collection<? extends Consumer> getConsumers() {
+ return fCache.getConsumers();
+ }
+
+ @Override
+ public synchronized void dropCache() {
+ fCache.dropAllConsumers();
+ }
+
+ private ConsumerConfig createConsumerConfig(String groupId,
+ String consumerId) {
+ final Properties props = new Properties();
+ props.put("zookeeper.connect", fZooKeeper);
+ props.put("group.id", groupId);
+ props.put("consumer.id", consumerId);
+ //props.put("auto.commit.enable", "false");
+ // additional settings: start with our defaults, then pull in configured
+ // overrides
+ props.putAll(KafkaInternalDefaults);
+ for (String key : KafkaConsumerKeys) {
+ transferSettingIfProvided(props, key, "kafka");
+ }
+
+ return new ConsumerConfig(props);
+ }
+
+ //private final rrNvReadable fSettings;
+ private final KafkaConsumerCache fCache;
+
+ private String fZooKeeper;
+
+ private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
+
+ private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
+
+ /**
+ * putting values in hashmap like consumer timeout, zookeeper time out, etc
+ *
+ * @param setting
+ */
+ public static void populateKafkaInternalDefaultsMap() {
+ //@Qualifier("propertyReader") rrNvReadable setting) {
+ try {
+
+ HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
+
+ KafkaInternalDefaults.put("consumer.timeout.ms",
+ // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
+ map1.get( "consumer.timeout.ms"));
+
+ KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
+ //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
+ map1.get("zookeeper.connection.timeout.ms"));
+ KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
+ //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
+ map1.get("zookeeper.session.timeout.ms"));
+ KafkaInternalDefaults.put("zookeeper.sync.time.ms",
+ // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
+ map1.get( "zookeeper.sync.time.ms"));
+ KafkaInternalDefaults.put("auto.commit.interval.ms",
+ //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
+ map1.get( "auto.commit.interval.ms"));
+ KafkaInternalDefaults.put("fetch.message.max.bytes",
+ //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
+ map1.get("fetch.message.max.bytes"));
+ KafkaInternalDefaults.put("auto.commit.enable",
+ // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
+ map1.get("auto.commit.enable"));
+ } catch (Exception e) {
+ log.error("Failed to load Kafka Internal Properties.", e);
+ }
+ }
+
+ private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
+ "socket.receive.buffer.bytes", "fetch.message.max.bytes",
+ "auto.commit.interval.ms", "queued.max.message.chunks",
+ "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
+ "rebalance.backoff.ms", "refresh.leader.backoff.ms",
+ "auto.offset.reset", "consumer.timeout.ms",
+ "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
+ "zookeeper.sync.time.ms" };
+
+ private static String makeLongKey(String key, String prefix) {
+ return prefix + "." + key;
+ }
+
+ private void transferSettingIfProvided(Properties target, String key,
+ String prefix) {
+ String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
+
+ // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+ if (null!=keyVal) {
+ // final String val = fSettings
+ // .getString(makeLongKey(key, prefix), "");
+ log.info("Setting [" + key + "] to " + keyVal + ".");
+ target.put(key, keyVal);
+ }
+ }
+
+ }
+
+
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java
new file mode 100644
index 0000000..9d53ef2
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java
@@ -0,0 +1,462 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+//import org.apache.log4-j.Logger;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.metabroker.Broker;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.ConfigPath;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.NsaAclUtils;
+import com.att.nsa.security.NsaApiKey;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+
+/**
+ * class performing all topic operations
+ *
+ * @author author
+ *
+ */
+
+public class DMaaPKafkaMetaBroker implements Broker {
+
+ //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+
+
+ /**
+ * DMaaPKafkaMetaBroker constructor initializing
+ *
+ * @param settings
+ * @param zk
+ * @param configDb
+ */
+ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ //fSettings = settings;
+ fZk = zk;
+ fCambriaConfig = configDb;
+ fBaseTopicData = configDb.parse("/topics");
+ }
+
+ @Override
+ public List<Topic> getAllTopics() throws ConfigDbException {
+ log.info("Retrieving list of all the topics.");
+ final LinkedList<Topic> result = new LinkedList<Topic>();
+ try {
+ log.info("Retrieving all topics from root: " + zkTopicsRoot);
+ final List<String> topics = fZk.getChildren(zkTopicsRoot);
+ for (String topic : topics) {
+ result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+ }
+
+ JSONObject dataObj = new JSONObject();
+ dataObj.put("topics", new JSONObject());
+
+ for (String topic : topics) {
+ dataObj.getJSONObject("topics").put(topic, new JSONObject());
+ }
+ } catch (ZkNoNodeException excp) {
+ // very fresh kafka doesn't have any topics or a topics node
+ log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+ }
+ return result;
+ }
+
+ @Override
+ public Topic getTopic(String topic) throws ConfigDbException {
+ if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+ return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+ }
+ // else: no such topic in kafka
+ return null;
+ }
+
+ /**
+ * static method get KafkaTopic object
+ *
+ * @param db
+ * @param base
+ * @param topic
+ * @return
+ * @throws ConfigDbException
+ */
+ public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
+ return new KafkaTopic(topic, db, base);
+ }
+
+ /**
+ * creating topic
+ */
+ @Override
+ public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
+ log.info("Creating topic: " + topic);
+ try {
+ log.info("Check if topic [" + topic + "] exist.");
+ // first check for existence "our way"
+ final Topic t = getTopic(topic);
+ if (t != null) {
+ log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+ throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
+ }
+ } catch (ConfigDbException e1) {
+ log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "Couldn't check topic data in config db.");
+ }
+
+ // we only allow 3 replicas. (If we don't test this, we get weird
+ // results from the cluster,
+ // so explicit test and fail.)
+ if (replicas < 1 || replicas > 3) {
+ log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+ throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
+ "The replica count must be between 1 and 3.");
+ }
+ if (partitions < 1) {
+ log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+ throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
+ }
+
+ // create via kafka
+ try {
+ ZkClient zkClient = null;
+ try {
+ log.info("Loading zookeeper client for creating topic.");
+ // FIXME: use of this scala module$ thing is a goofy hack to
+ // make Kafka aware of the
+ // topic creation. (Otherwise, the topic is only partially
+ // created in ZK.)
+ zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
+ ZKStringSerializer$.MODULE$);
+
+ log.info("Zookeeper client loaded successfully. Creating topic.");
+ AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
+ } catch (kafka.common.TopicExistsException e) {
+ log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
+ throw new TopicExistsException(topic);
+ } catch (ZkNoNodeException e) {
+ log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
+ // Kafka throws this when the server isn't running (and perhaps
+ // hasn't ever run)
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "The Kafka cluster is not setup.");
+ } catch (kafka.admin.AdminOperationException e) {
+ // Kafka throws this when the server isn't running (and perhaps
+ // hasn't ever run)
+ log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
+ e);
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "The Kafka cluster can't handle your request. Talk to the administrators.");
+ } finally {
+ log.info("Closing zookeeper connection.");
+ if (zkClient != null)
+ zkClient.close();
+ }
+
+ log.info("Creating topic entry for topic: " + topic);
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (ConfigDbException excp) {
+ log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "Failed to create topic data. Talk to the administrators.");
+ }
+ }
+
+ @Override
+ public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
+ log.info("Deleting topic: " + topic);
+ ZkClient zkClient = null;
+ try {
+ log.info("Loading zookeeper client for topic deletion.");
+ // FIXME: use of this scala module$ thing is a goofy hack to make
+ // Kafka aware of the
+ // topic creation. (Otherwise, the topic is only partially created
+ // in ZK.)
+ zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
+ ZKStringSerializer$.MODULE$);
+
+ log.info("Zookeeper client loaded successfully. Deleting topic.");
+ AdminUtils.deleteTopic(zkClient, topic);
+ } catch (kafka.common.TopicExistsException e) {
+ log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
+ throw new TopicExistsException(topic);
+ } catch (ZkNoNodeException e) {
+ log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
+ // Kafka throws this when the server isn't running (and perhaps
+ // hasn't ever run)
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
+ } catch (kafka.admin.AdminOperationException e) {
+ // Kafka throws this when the server isn't running (and perhaps
+ // hasn't ever run)
+ log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "The Kafka cluster can't handle your request. Talk to the administrators.");
+ } finally {
+ log.info("Closing zookeeper connection.");
+ if (zkClient != null)
+ zkClient.close();
+ }
+
+ // throw new UnsupportedOperationException ( "We can't programmatically
+ // delete Kafka topics yet." );
+ }
+
+ //private final rrNvReadable fSettings;
+ private final ZkClient fZk;
+ private final ConfigDb fCambriaConfig;
+ private final ConfigPath fBaseTopicData;
+
+ private static final String zkTopicsRoot = "/brokers/topics";
+ private static final JSONObject kEmptyAcl = new JSONObject();
+
+ /**
+ * method Providing KafkaTopic Object associated with owner and
+ * transactionenabled or not
+ *
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ * @return
+ * @throws ConfigDbException
+ */
+ public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
+ throws ConfigDbException {
+ return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
+ }
+
+ /**
+ * static method giving kafka topic object
+ *
+ * @param db
+ * @param basePath
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ * @return
+ * @throws ConfigDbException
+ */
+ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
+ boolean transactionEnabled) throws ConfigDbException {
+ final JSONObject o = new JSONObject();
+ o.put("owner", owner);
+ o.put("description", desc);
+ o.put("txenabled", transactionEnabled);
+ db.store(basePath.getChild(name), o.toString());
+ return new KafkaTopic(name, db, basePath);
+ }
+
+ /**
+ * class performing all user opearation like user is eligible to read,
+ * write. permitting a user to write and read,
+ *
+ * @author author
+ *
+ */
+ public static class KafkaTopic implements Topic {
+ /**
+ * constructor initializes
+ *
+ * @param name
+ * @param configdb
+ * @param baseTopic
+ * @throws ConfigDbException
+ */
+ public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
+ fName = name;
+ fConfigDb = configdb;
+ fBaseTopicData = baseTopic;
+
+ String data = fConfigDb.load(fBaseTopicData.getChild(fName));
+ if (data == null) {
+ data = "{}";
+ }
+
+ final JSONObject o = new JSONObject(data);
+ fOwner = o.optString("owner", "");
+ fDesc = o.optString("description", "");
+ fTransactionEnabled = o.optBoolean("txenabled", false);// default
+ // value is
+ // false
+ // if this topic has an owner, it needs both read/write ACLs. If there's no
+ // owner (or it's empty), null is okay -- this is for existing or implicitly
+ // created topics.
+ JSONObject readers = o.optJSONObject ( "readers" );
+ if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+ fReaders = fromJson ( readers );
+
+ JSONObject writers = o.optJSONObject ( "writers" );
+ if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+ fWriters = fromJson ( writers );
+ }
+ private NsaAcl fromJson(JSONObject o) {
+ NsaAcl acl = new NsaAcl();
+ if (o != null) {
+ JSONArray a = o.optJSONArray("allowed");
+ if (a != null) {
+ for (int i = 0; i < a.length(); ++i) {
+ String user = a.getString(i);
+ acl.add(user);
+ }
+ }
+ }
+ return acl;
+ }
+ @Override
+ public String getName() {
+ return fName;
+ }
+
+ @Override
+ public String getOwner() {
+ return fOwner;
+ }
+
+ @Override
+ public String getDescription() {
+ return fDesc;
+ }
+
+ @Override
+ public NsaAcl getReaderAcl() {
+ return fReaders;
+ }
+
+ @Override
+ public NsaAcl getWriterAcl() {
+ return fWriters;
+ }
+
+ @Override
+ public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
+ NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
+ }
+
+ @Override
+ public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
+ NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
+ }
+
+ @Override
+ public void permitWritesFromUser(String pubId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, false, true, pubId);
+ }
+
+ @Override
+ public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, false, false, pubId);
+ }
+
+ @Override
+ public void permitReadsByUser(String consumerId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, true, true, consumerId);
+ }
+
+ @Override
+ public void denyReadsByUser(String consumerId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, true, false, consumerId);
+ }
+
+ private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
+ throws ConfigDbException, AccessDeniedException{
+ try
+ {
+ final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
+
+ // we have to assume we have current data, or load it again. for the expected use
+ // case, assuming we can overwrite the data is fine.
+ final JSONObject o = new JSONObject ();
+ o.put ( "owner", fOwner );
+ o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
+ o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
+ fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
+
+ log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
+
+ }
+ catch ( ConfigDbException x )
+ {
+ throw x;
+ }
+ catch ( AccessDeniedException x )
+ {
+ throw x;
+ }
+
+ }
+
+ private JSONObject safeSerialize(NsaAcl acl) {
+ return acl == null ? null : acl.serialize();
+ }
+
+ private final String fName;
+ private final ConfigDb fConfigDb;
+ private final ConfigPath fBaseTopicData;
+ private final String fOwner;
+ private final String fDesc;
+ private final NsaAcl fReaders;
+ private final NsaAcl fWriters;
+ private boolean fTransactionEnabled;
+
+ public boolean isTransactionEnabled() {
+ return fTransactionEnabled;
+ }
+
+ @Override
+ public Set<String> getOwners() {
+ final TreeSet<String> owners = new TreeSet<String> ();
+ owners.add ( fOwner );
+ return owners;
+ }
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java
new file mode 100644
index 0000000..3c3aa6d
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java
@@ -0,0 +1,232 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.att.nsa.apiServer.metrics.cambria.DMaaPMetricsSender;
+import com.att.nsa.cambria.CambriaApiVersionInfo;
+import com.att.nsa.cambria.backends.MetricsSet;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.metrics.impl.CdmConstant;
+import com.att.nsa.metrics.impl.CdmCounter;
+import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
+import com.att.nsa.metrics.impl.CdmMovingAverage;
+import com.att.nsa.metrics.impl.CdmRateTicker;
+import com.att.nsa.metrics.impl.CdmSimpleMetric;
+import com.att.nsa.metrics.impl.CdmStringConstant;
+import com.att.nsa.metrics.impl.CdmTimeSince;
+
+/*@Component("dMaaPMetricsSet")*/
+/**
+ * Metrics related information
+ *
+ * @author author
+ *
+ */
+public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
+
+ private final CdmStringConstant fVersion;
+ private final CdmConstant fStartTime;
+ private final CdmTimeSince fUpTime;
+
+ private final CdmCounter fRecvTotal;
+ private final CdmRateTicker fRecvEpsInstant;
+ private final CdmRateTicker fRecvEpsShort;
+ private final CdmRateTicker fRecvEpsLong;
+
+ private final CdmCounter fSendTotal;
+ private final CdmRateTicker fSendEpsInstant;
+ private final CdmRateTicker fSendEpsShort;
+ private final CdmRateTicker fSendEpsLong;
+
+ private final CdmCounter fKafkaConsumerCacheMiss;
+ private final CdmCounter fKafkaConsumerCacheHit;
+
+ private final CdmCounter fKafkaConsumerClaimed;
+ private final CdmCounter fKafkaConsumerTimeout;
+
+ private final CdmSimpleMetric fFanOutRatio;
+
+ private final HashMap<String, CdmRateTicker> fPathUseRates;
+ private final HashMap<String, CdmMovingAverage> fPathAvgs;
+
+ private rrNvReadable fSettings;
+
+ private final ScheduledExecutorService fScheduler;
+
+ /**
+ * Constructor initialization
+ *
+ * @param cs
+ */
+ //public DMaaPMetricsSet() {
+ public DMaaPMetricsSet(rrNvReadable cs) {
+ //fSettings = cs;
+
+ fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
+ super.putItem("version", fVersion);
+
+ final long startTime = System.currentTimeMillis();
+ final Date d = new Date(startTime);
+ final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
+ fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
+ super.putItem("startTime", fStartTime);
+
+ fUpTime = new CdmTimeSince("seconds since start");
+ super.putItem("upTime", fUpTime);
+
+ fRecvTotal = new CdmCounter("Total events received since start");
+ super.putItem("recvTotalEvents", fRecvTotal);
+
+ fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
+ super.putItem("recvEpsInstant", fRecvEpsInstant);
+
+ fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
+ super.putItem("recvEpsShort", fRecvEpsShort);
+
+ fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
+ super.putItem("recvEpsLong", fRecvEpsLong);
+
+ fSendTotal = new CdmCounter("Total events sent since start");
+ super.putItem("sendTotalEvents", fSendTotal);
+
+ fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
+ super.putItem("sendEpsInstant", fSendEpsInstant);
+
+ fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
+ super.putItem("sendEpsShort", fSendEpsShort);
+
+ fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
+ super.putItem("sendEpsLong", fSendEpsLong);
+
+ fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
+ super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
+
+ fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
+ super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
+
+ fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
+ super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
+
+ fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
+ super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
+
+ // FIXME: CdmLevel is not exactly a great choice
+ fFanOutRatio = new CdmSimpleMetric() {
+ @Override
+ public String getRawValueString() {
+ return getRawValue().toString();
+ }
+
+ @Override
+ public Number getRawValue() {
+ final double s = fSendTotal.getValue();
+ final double r = fRecvTotal.getValue();
+ return r == 0.0 ? 0.0 : s / r;
+ }
+
+ @Override
+ public String summarize() {
+ return getRawValueString() + " sends per recv";
+ }
+
+ };
+ super.putItem("fanOut", fFanOutRatio);
+
+ // these are added to the metrics catalog as they're discovered
+ fPathUseRates = new HashMap<String, CdmRateTicker>();
+ fPathAvgs = new HashMap<String, CdmMovingAverage>();
+
+ fScheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ @Override
+ public void setupCambriaSender() {
+ DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
+ }
+
+ @Override
+ public void onRouteComplete(String name, long durationMs) {
+ CdmRateTicker ticker = fPathUseRates.get(name);
+ if (ticker == null) {
+ ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
+ fPathUseRates.put(name, ticker);
+ super.putItem("pathUse_" + name, ticker);
+ }
+ ticker.tick();
+
+ CdmMovingAverage durs = fPathAvgs.get(name);
+ if (durs == null) {
+ durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
+ fPathAvgs.put(name, durs);
+ super.putItem("pathDurationMs_" + name, durs);
+ }
+ durs.tick(durationMs);
+ }
+
+ @Override
+ public void publishTick(int amount) {
+ if (amount > 0) {
+ fRecvTotal.bumpBy(amount);
+ fRecvEpsInstant.tick(amount);
+ fRecvEpsShort.tick(amount);
+ fRecvEpsLong.tick(amount);
+ }
+ }
+
+ @Override
+ public void consumeTick(int amount) {
+ if (amount > 0) {
+ fSendTotal.bumpBy(amount);
+ fSendEpsInstant.tick(amount);
+ fSendEpsShort.tick(amount);
+ fSendEpsLong.tick(amount);
+ }
+ }
+
+ @Override
+ public void onKafkaConsumerCacheMiss() {
+ fKafkaConsumerCacheMiss.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerCacheHit() {
+ fKafkaConsumerCacheHit.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerClaimed() {
+ fKafkaConsumerClaimed.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerTimeout() {
+ fKafkaConsumerTimeout.bump();
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java
new file mode 100644
index 0000000..ce257d4
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java
@@ -0,0 +1,139 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import java.security.Key;
+
+//import org.apache.log4-j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.confimpl.EncryptingLayer;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.EncryptingApiDbImpl;
+import com.att.nsa.security.db.NsaApiDb;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import com.att.nsa.util.rrConvertor;
+
+/**
+ *
+ * @author author
+ *
+ */
+public class DMaaPNsaApiDb {
+
+ //private rrNvReadable settings;
+ private DMaaPZkConfigDb cdb;
+
+ //private static final Logger log = Logger
+ // .getLogger(DMaaPNsaApiDb.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
+
+/**
+ *
+ * Constructor initialized
+ * @param settings
+ * @param cdb
+ */
+ @Autowired
+ public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
+ //this.setSettings(settings);
+ this.setCdb(cdb);
+ }
+ /**
+ *
+ * @param settings
+ * @param cdb
+ * @return
+ * @throws ConfigDbException
+ * @throws missingReqdSetting
+ */
+ public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb(
+ rrNvReadable settings, ConfigDb cdb) throws ConfigDbException,
+ missingReqdSetting {
+ // Cambria uses an encrypted api key db
+
+ //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
+ final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
+
+
+ // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
+ final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
+ // if neither value was provided, don't encrypt api key db
+ if (keyBase64 == null && initVectorBase64 == null) {
+ log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
+ return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+ new NsaSimpleApiKeyFactory());
+ } else if (keyBase64 == null) {
+ // neither or both, otherwise something's goofed
+ throw new missingReqdSetting("cambria.secureConfig.key");
+ } else if (initVectorBase64 == null) {
+ // neither or both, otherwise something's goofed
+ throw new missingReqdSetting("cambria.secureConfig.iv");
+ } else {
+ log.info("This server is configured to use an encrypted API key database.");
+ final Key key = EncryptingLayer.readSecretKey(keyBase64);
+ final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
+ return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+ new NsaSimpleApiKeyFactory(), key, iv);
+ }
+ }
+
+ /**
+ * @return
+ * returns settings
+ */
+/* public rrNvReadable getSettings() {
+ return settings;
+ }*/
+
+ /**
+ * @param settings
+ * set settings
+ */
+ /*public void setSettings(rrNvReadable settings) {
+ this.settings = settings;
+ }*/
+
+ /**
+ * @return
+ * returns cbd
+ */
+ public DMaaPZkConfigDb getCdb() {
+ return cdb;
+ }
+ /**
+ * @param cdb
+ * set cdb
+ */
+ public void setCdb(DMaaPZkConfigDb cdb) {
+ this.cdb = cdb;
+ }
+
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java
new file mode 100644
index 0000000..590ecd6
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+/**
+ * Created for Zookeeper client which will read configuration and settings parameter
+ * @author author
+ *
+ */
+public class DMaaPZkClient extends ZkClient {
+
+ /**
+ * This constructor will get the settings value from rrNvReadable
+ * and ConfigurationReader's zookeeper connection
+ * @param settings
+ */
+ public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) {
+ super(ConfigurationReader.getMainZookeeperConnectionString());
+ }
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java
new file mode 100644
index 0000000..8fe96e9
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.beans;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.confimpl.ZkConfigDb;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+//import com.att.nsa.configs.confimpl.ZkConfigDb;
+/**
+ * Provide the zookeeper config db connection
+ * @author author
+ *
+ */
+public class DMaaPZkConfigDb extends ZkConfigDb {
+ /**
+ * This Constructor will provide the configuration details from the property reader
+ * and DMaaPZkClient
+ * @param zk
+ * @param settings
+ */
+ public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
+ @Qualifier("propertyReader") rrNvReadable settings) {
+
+ //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+ super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
+
+ }
+
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java
new file mode 100644
index 0000000..5a195e9
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java
@@ -0,0 +1,214 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package com.att.nsa.cambria.beans;
+
+import java.util.Date;
+
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.utils.Utils;
+
+/**
+ * @author author
+ *
+ */
+
+public class LogDetails {
+
+ private String publisherId;
+ private String topicId;
+ private String subscriberGroupId;
+ private String subscriberId;
+ private String publisherIp;
+ private String messageBatchId;
+ private String messageSequence;
+ private String messageTimestamp;
+ private String consumeTimestamp;
+ private String transactionIdTs;
+ private String serverIp;
+
+ private long messageLengthInBytes;
+ private long totalMessageCount;
+
+ private boolean transactionEnabled;
+ /**
+ * This is for transaction enabled logging details
+ *
+ */
+ public LogDetails() {
+ super();
+ }
+
+ public String getTransactionId() {
+ StringBuilder transactionId = new StringBuilder();
+ transactionId.append(transactionIdTs);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(publisherIp);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(messageBatchId);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(messageSequence);
+
+ return transactionId.toString();
+ }
+
+ public String getPublisherId() {
+ return publisherId;
+ }
+
+ public void setPublisherId(String publisherId) {
+ this.publisherId = publisherId;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public void setTopicId(String topicId) {
+ this.topicId = topicId;
+ }
+
+ public String getSubscriberGroupId() {
+ return subscriberGroupId;
+ }
+
+ public void setSubscriberGroupId(String subscriberGroupId) {
+ this.subscriberGroupId = subscriberGroupId;
+ }
+
+ public String getSubscriberId() {
+ return subscriberId;
+ }
+
+ public void setSubscriberId(String subscriberId) {
+ this.subscriberId = subscriberId;
+ }
+
+ public String getPublisherIp() {
+ return publisherIp;
+ }
+
+ public void setPublisherIp(String publisherIp) {
+ this.publisherIp = publisherIp;
+ }
+
+ public String getMessageBatchId() {
+ return messageBatchId;
+ }
+
+ public void setMessageBatchId(Long messageBatchId) {
+ this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId);
+ }
+
+ public String getMessageSequence() {
+ return messageSequence;
+ }
+
+ public void setMessageSequence(String messageSequence) {
+ this.messageSequence = messageSequence;
+ }
+
+ public String getMessageTimestamp() {
+ return messageTimestamp;
+ }
+
+ public void setMessageTimestamp(String messageTimestamp) {
+ this.messageTimestamp = messageTimestamp;
+ }
+
+ public String getPublishTimestamp() {
+ return Utils.getFormattedDate(new Date());
+ }
+
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
+ }
+
+ public void setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ }
+
+ public long getMessageLengthInBytes() {
+ return messageLengthInBytes;
+ }
+
+ public void setMessageLengthInBytes(long messageLengthInBytes) {
+ this.messageLengthInBytes = messageLengthInBytes;
+ }
+
+ public long getTotalMessageCount() {
+ return totalMessageCount;
+ }
+
+ public void setTotalMessageCount(long totalMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ }
+
+ public boolean isTransactionEnabled() {
+ return transactionEnabled;
+ }
+
+ public void setTransactionEnabled(boolean transactionEnabled) {
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ public String getTransactionIdTs() {
+ return transactionIdTs;
+ }
+
+ public void setTransactionIdTs(String transactionIdTs) {
+ this.transactionIdTs = transactionIdTs;
+ }
+
+ public String getPublisherLogDetails() {
+
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[publisherId=" + publisherId);
+ buffer.append(", topicId=" + topicId);
+ buffer.append(", messageTimestamp=" + messageTimestamp);
+ buffer.append(", publisherIp=" + publisherIp);
+ buffer.append(", messageBatchId=" + messageBatchId);
+ buffer.append(", messageSequence=" + messageSequence );
+ buffer.append(", messageLengthInBytes=" + messageLengthInBytes);
+ buffer.append(", transactionEnabled=" + transactionEnabled);
+ buffer.append(", transactionId=" + getTransactionId());
+ buffer.append(", publishTimestamp=" + getPublishTimestamp());
+ buffer.append(", serverIp=" + getServerIp()+"]");
+ return buffer.toString();
+
+ }
+
+ public String getServerIp() {
+ return serverIp;
+ }
+
+ public void setServerIp(String serverIp) {
+ this.serverIp = serverIp;
+ }
+
+ public void setMessageBatchId(String messageBatchId) {
+ this.messageBatchId = messageBatchId;
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java
new file mode 100644
index 0000000..3303c07
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java
@@ -0,0 +1,155 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package com.att.nsa.cambria.beans;
+
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author author
+ *
+ */
+@XmlRootElement
+public class TopicBean implements Serializable {
+
+ private static final long serialVersionUID = -8620390377775457949L;
+ private String topicName;
+ private String topicDescription;
+
+ private int partitionCount = 1; //default values
+ private int replicationCount = 1; //default value
+
+ private boolean transactionEnabled;
+
+ /**
+ * constructor
+ */
+ public TopicBean() {
+ super();
+ }
+
+ /**
+ * constructor initialization with topic details name, description,
+ * partition, replication, transaction
+ *
+ * @param topicName
+ * @param description
+ * @param partitionCount
+ * @param replicationCount
+ * @param transactionEnabled
+ */
+ public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount,
+ boolean transactionEnabled) {
+ super();
+ this.topicName = topicName;
+ this.topicDescription = topicDescription;
+ this.partitionCount = partitionCount;
+ this.replicationCount = replicationCount;
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ /**
+ * @return
+ * returns topic name which is of String type
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * @param topicName
+ * set topic name
+ */
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+
+ /**
+ * @return
+ * returns partition count which is of int type
+ */
+ public int getPartitionCount() {
+ return partitionCount;
+ }
+
+ /**
+ * @param partitionCount
+ * set partition Count
+ */
+ public void setPartitionCount(int partitionCount) {
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * @return
+ * returns replication count which is of int type
+ */
+ public int getReplicationCount() {
+ return replicationCount;
+ }
+
+ /**
+ * @param
+ * set replication count which is of int type
+ */
+ public void setReplicationCount(int replicationCount) {
+ this.replicationCount = replicationCount;
+ }
+
+ /**
+ * @return
+ * returns boolean value which indicates whether transaction is Enabled
+ */
+ public boolean isTransactionEnabled() {
+ return transactionEnabled;
+ }
+
+ /**
+ * @param
+ * sets boolean value which indicates whether transaction is Enabled
+ */
+ public void setTransactionEnabled(boolean transactionEnabled) {
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ /**
+ *
+ * @return returns description which is of String type
+ */
+ public String getTopicDescription() {
+ return topicDescription;
+ }
+ /**
+ *
+ * @param topicDescription
+ * set description which is of String type
+ */
+ public void setTopicDescription(String topicDescription) {
+ this.topicDescription = topicDescription;
+ }
+
+}