summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/beans
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java88
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java329
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPContext.java104
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java365
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java497
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java231
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java140
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java45
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java51
-rw-r--r--src/main/java/com/att/dmf/mr/beans/LogDetails.java214
-rw-r--r--src/main/java/com/att/dmf/mr/beans/TopicBean.java155
11 files changed, 2219 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java b/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java
new file mode 100644
index 0000000..4f0108f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+@XmlRootElement
+public class ApiKeyBean implements Serializable {
+
+ private static final long serialVersionUID = -8219849086890567740L;
+
+ private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ private String email;
+ private String description;
+ /**
+ * constructor
+ */
+ public ApiKeyBean() {
+ super();
+ }
+/**
+ *
+ * @param email
+ * @param description
+ */
+ public ApiKeyBean(String email, String description) {
+ super();
+ this.email = email;
+ this.description = description;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getKey() {
+ return generateKey(16);
+ }
+
+ public String getSharedSecret() {
+ return generateKey(24);
+ }
+
+ private static String generateKey ( int length ) {
+ return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length );
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
new file mode 100644
index 0000000..5f28367
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -0,0 +1,329 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.utils.Utils;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.metrics.impl.CdmRateTicker;
+
+/**
+ * class provide rate information
+ *
+ * @author anowarul.islam
+ *
+ */
+@Component
+public class DMaaPCambriaLimiter {
+ /**
+ * constructor initializes
+ *
+ * @param settings
+ * @throws missingReqdSetting
+ * @throws invalidSettingValue
+ */
+ @Autowired
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
+ throws missingReqdSetting, invalidSettingValue {
+ fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+ CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
+ 30);
+ fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
+ CambriaConstants.kDefault_RateLimitWindowLength);
+ fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
+ CambriaConstants.kDefault_SleepMsOnRateLimit);
+ fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
+ 5000);
+
+ }
+
+ /**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
+ * Construct a rate limiter.
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
+ this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
+ }
+
+ /**
+ * Construct a rate limiter
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param sleepMs
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
+ fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
+ fWindowLengthMins = windowLengthMins;
+ fSleepMs = Math.max(0, sleepMs);
+ fSleepMs1 = Math.max(0, sleepMS1);
+ }
+
+ /**
+ * Tell the rate limiter about a call to a topic/group/id. If the rate is
+ * too high, this call delays its return and throws an exception.
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @throws CambriaApiException
+ */
+ public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
+ // do nothing if rate is configured 0 or less
+ if (fMaxEmptyPollsPerMinute <= 0) {
+ return;
+ }
+ // setup rate info for this tuple
+ final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
+ final double rate = ri.onCall();
+ log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
+ if (rate > fMaxEmptyPollsPerMinute) {
+ try {
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
+ + ".");
+ if (fSleepMs > 0) {
+ log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ + " ms sleep, then responding in error.");
+ Thread.sleep(fSleepMs);
+
+ } else {
+ log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "This client is making too many requests. Please use a long poll "
+ + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ /*if (fMaxPollsPerMinute <= 0) {
+ return;
+ }
+
+ final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
+ final double ratevalue = ric.onCall();
+ if (ratevalue > fMaxPollsPerMinute) {
+ try {
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+ + ".");
+ if (fSleepMs1 > fMaxPollsPerMinute) {
+ log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ + " ms sleep, then responding in error.");
+ Thread.sleep(fSleepMs1);
+ ric.reset();
+ } else {
+ log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "This client is making too many requests "
+ + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }*/
+
+ }
+
+ /**
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @param sentCount
+ */
+ public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
+ // check for good replies
+ if (sentCount > 0) {
+ // that was a good send, reset the metric
+ getRateInfo(topic, consumerGroup, clientId).reset();
+ }
+ }
+
+ private static class RateInfo {
+ /**
+ * constructor initialzes
+ *
+ * @param label
+ * @param windowLengthMinutes
+ */
+ public RateInfo(String label, int windowLengthMinutes) {
+ fLabel = label;
+ fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+ windowLengthMinutes, TimeUnit.MINUTES);
+ }
+
+ public String getLabel() {
+ return fLabel;
+ }
+
+ /**
+ * CdmRateTicker is reset
+ */
+ public void reset() {
+ fCallRateSinceLastMsgSend.reset();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public double onCall() {
+ fCallRateSinceLastMsgSend.tick();
+ return fCallRateSinceLastMsgSend.getRate();
+ }
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
+ }
+
+
+
+ private static class RateInfoCheck {
+ /**
+ * constructor initialzes
+ *
+ * @param label
+ * @param windowLengthMinutes
+ */
+ public RateInfoCheck(String label, int windowLengthMinutes) {
+ fLabel = label;
+ fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+ windowLengthMinutes, TimeUnit.MINUTES);
+ }
+
+ public String getLabel() {
+ return fLabel;
+ }
+
+ /**
+ * CdmRateTicker is reset
+ */
+ public void reset() {
+ fCallRateSinceLastMsgSend.reset();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public double onCall() {
+ fCallRateSinceLastMsgSend.tick();
+ return fCallRateSinceLastMsgSend.getRate();
+ }
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
+ }
+
+
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
+ private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
+ final String key = makeKey(topic, consumerGroup, clientId);
+ RateInfo ri = fRateInfo.get(key);
+ if (ri == null) {
+ ri = new RateInfo(key, fWindowLengthMins);
+ fRateInfo.put(key, ri);
+ }
+ return ri;
+ }
+
+
+ private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
+ final String key = makeKey(topic, consumerGroup, clientId);
+ RateInfoCheck ri = fRateInfoCheck.get(key);
+ if (ri == null) {
+ ri = new RateInfoCheck(key, 1);
+ fRateInfoCheck.put(key, ri);
+ }
+ return ri;
+ }
+
+
+
+
+ private String makeKey(String topic, String group, String id) {
+ return topic + "::" + group + "::" + id;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java b/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java
new file mode 100644
index 0000000..a880877
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java
@@ -0,0 +1,104 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import com.att.dmf.mr.utils.ConfigurationReader;
+
+/**
+ * DMaaPContext provide and maintain all the configurations , Http request/response
+ * Session and consumer Request Time
+ * @author nilanjana.maity
+ *
+ */
+public class DMaaPContext {
+
+ private ConfigurationReader configReader;
+ private HttpServletRequest request;
+ private HttpServletResponse response;
+ private HttpSession session;
+ private String consumerRequestTime;
+ static int i=0;
+
+ public synchronized static long getBatchID() {
+ try{
+ final long metricsSendTime = System.currentTimeMillis();
+ final Date d = new Date(metricsSendTime);
+ final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d);
+ long dt= Long.valueOf(text)+i;
+ i++;
+ return dt;
+ }
+ catch(NumberFormatException ex){
+ return 0;
+ }
+ }
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(HttpServletRequest request) {
+ this.request = request;
+ }
+
+ public HttpServletResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(HttpServletResponse response) {
+ this.response = response;
+ }
+
+ public HttpSession getSession() {
+ this.session = request.getSession();
+ return session;
+ }
+
+ public void setSession(HttpSession session) {
+ this.session = session;
+ }
+
+ public ConfigurationReader getConfigReader() {
+ return configReader;
+ }
+
+ public void setConfigReader(ConfigurationReader configReader) {
+ this.configReader = configReader;
+ }
+
+ public String getConsumerRequestTime() {
+ return consumerRequestTime;
+ }
+
+ public void setConsumerRequestTime(String consumerRequestTime) {
+ this.consumerRequestTime = consumerRequestTime;
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
new file mode 100644
index 0000000..6fc0838
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -0,0 +1,365 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.ConsumerFactory;
+import com.att.dmf.mr.backends.MetricsSet;
+import com.att.dmf.mr.backends.kafka.Kafka011Consumer;
+import com.att.dmf.mr.backends.kafka.Kafka011ConsumerUtil;
+import com.att.dmf.mr.backends.kafka.KafkaConsumerCache;
+import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
+import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
+import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.utils.ConfigurationReader;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+
+/**
+ * @author nilanjana.maity
+ *
+ */
+public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+
+ // private static final Logger log = LoggerFactory
+ // .getLogger(DMaaPKafkaConsumerFactory.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
+ // @Autowired
+ // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
+ // KafkaLiveLockAvoider();
+
+ /**
+ * constructor initialization
+ *
+ * @param settings
+ * @param metrics
+ * @param curator
+ * @throws missingReqdSetting
+ * @throws KafkaConsumerCacheException
+ * @throws UnknownHostException
+ */
+
+ public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
+
+ String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ CambriaConstants.kSetting_ApiNodeIdentifier);
+ if (apiNodeId == null) {
+
+ apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
+ }
+
+ log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
+ final String mode = CambriaConstants.DMAAP;
+
+ fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+ boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
+ String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "cambria.consumer.cache.enabled");
+ if (null != strkSetting_EnableCache)
+ kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
+
+ final boolean isCacheEnabled = kSetting_EnableCache;
+
+ // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
+ // metrics) : null;
+ fCache = null;
+ if (isCacheEnabled) {
+ fCache = KafkaConsumerCache.getInstance();
+
+ }
+ if (fCache != null) {
+ fCache.setfMetrics(metrics);
+ fCache.setfApiId(apiNodeId);
+ fCache.startCache(mode, curator);
+ if(kafkaLiveLockAvoider!=null){
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ }
+ }
+ }
+
+ /*
+ * getConsumerFor
+ *
+ * @see
+ * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
+ * java.lang.String, java.lang.String, int, java.lang.String) This method is
+ * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
+ * either from kafkaconsumer cache or create a new connection This also get
+ * the list of other consumer objects for the same consumer group and set to
+ * KafkaConsumer object. This list may be used during poll-rebalancing
+ * issue.
+ */
+ @Override
+ public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ Kafka011Consumer kc;
+
+ // To synchronize based on the consumer group.
+
+ Object syncObject = synchash.get(topic + consumerGroupName);
+ if (null == syncObject) {
+ syncObject = new Object();
+ synchash.put(topic + consumerGroupName, syncObject);
+ }
+
+ synchronized (syncObject) {
+ try {
+ kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
+
+ } catch (KafkaConsumerCacheException e) {
+ log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
+ + "::" + consumerId);
+ log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
+ + "::" + consumerId);
+ throw new UnavailableException(e);
+ }
+
+ // Ideally if cache exists below flow should be skipped. If cache
+ // didnt
+ // exist, then create this first time on this node.
+ if (kc == null) {
+
+ log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
+ + kc);
+
+ final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
+ "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
+ boolean locked = false;
+
+ try {
+
+ locked = ipLock.acquire(30, TimeUnit.SECONDS);
+ if (!locked) {
+
+ log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
+ + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
+ throw new UnavailableException(
+ "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
+ + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
+ }
+
+ // ConfigurationReader.getCurator().checkExists().forPath("S").
+
+ log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ + "], on topic [" + topic + "].");
+
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+
+ final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
+ long fCreateTimeMs = System.currentTimeMillis();
+ KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
+ // );
+ log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+
+ if (fCache != null) {
+ fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
+ }
+
+ } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
+ log.info(
+ "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
+ throw new UnavailableException("Couldn't connect to ZK.");
+ } catch (KafkaConsumerCacheException e) {
+ log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
+ + " " + consumerGroupName + "/" + consumerId);
+ } catch (UnavailableException u) {
+ log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
+ + "/" + consumerId);
+ throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
+ } catch (Exception e) {
+ log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ + consumerId);
+ log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ + consumerId);
+
+ } finally {
+ if (locked) {
+ try {
+ ipLock.release();
+ } catch (Exception e) {
+ throw new UnavailableException("Error while releasing consumer factory lock" + e, e);
+ }
+ }
+ }
+ }
+ }
+ return kc;
+ }
+
+ @Override
+ public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
+ if (fCache != null) {
+ fCache.dropConsumer(topic, consumerGroup, clientId);
+ }
+ }
+
+ @Override
+ public synchronized Collection<? extends Consumer> getConsumers() {
+ return fCache.getConsumers();
+ }
+
+ @Override
+ public synchronized void dropCache() {
+ fCache.dropAllConsumers();
+ }
+
+
+ private KafkaConsumerCache fCache;
+ private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
+ private String fkafkaBrokers;
+
+
+
+ private static String makeLongKey(String key, String prefix) {
+ return prefix + "." + key;
+ }
+
+ private void transferSettingIfProvided(Properties target, String key, String prefix) {
+ String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
+
+ // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+ if (null != keyVal) {
+ // final String val = fSettings
+ // .getString(makeLongKey(key, prefix), "");
+ log.info("Setting [" + key + "] to " + keyVal + ".");
+ target.put(key, keyVal);
+ }
+ }
+
+ /**
+ * Name CreateConsumerconfig
+ * @param topic
+ * @param groupId
+ * @param consumerId
+ * @return Properties
+ *
+ * This method is to create Properties required to create kafka connection
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
+ * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
+ */
+ private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
+ final Properties props = new Properties();
+ //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
+ //Fix for CPFMF-644 :
+ final String fakeGroupName = groupId + "--" + topic;
+ props.put("group.id", fakeGroupName);
+ props.put("enable.auto.commit", "false"); // 0.11
+ props.put("bootstrap.servers", fkafkaBrokers);
+ /*props.put("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");*/
+ props.put("client.id", consumerId);
+
+ // additional settings: start with our defaults, then pull in configured
+ // overrides
+ populateKafkaInternalDefaultsMap();
+ for (String key : KafkaConsumerKeys) {
+ transferSettingIfProvided(props, key, "kafka");
+ }
+
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ return props;
+ }
+
+
+ private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
+ "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
+ "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
+ "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
+ "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
+ "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
+
+ /**
+ * putting values in hashmap like consumer timeout, zookeeper time out, etc
+ *
+ * @param setting
+ */
+ private static void populateKafkaInternalDefaultsMap() { }
+
+ /*
+ * The starterIncremnt value is just to emulate calling certain consumers,
+ * in this test app all the consumers are local
+ *
+ */
+ private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
+
+ return new LiveLockAvoidance() {
+
+ @Override
+ public String getAppId() {
+ return appId;
+ }
+
+ @Override
+ public void handleRebalanceUnlock(String groupName) {
+ log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
+ Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
+ }
+
+ };
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private HashMap<String, Object> synchash = new HashMap<String, Object>();
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
new file mode 100644
index 0000000..643eae9
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -0,0 +1,497 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.json.JSONObject;
+import org.json.JSONArray;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metabroker.Broker;
+import com.att.dmf.mr.metabroker.Broker1;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.dmf.mr.utils.ConfigurationReader;
+//import org.apache.log4-j.Logger;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.ConfigPath;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.NsaAclUtils;
+import com.att.nsa.security.NsaApiKey;
+
+
+/**
+ * class performing all topic operations
+ *
+ * @author anowarul.islam
+ *
+ */
+//@Component
+public class DMaaPKafkaMetaBroker implements Broker1 {
+
+ public DMaaPKafkaMetaBroker() {
+ fZk = null;
+ fCambriaConfig = null;
+ fBaseTopicData = null;
+ final Properties props = new Properties ();
+ String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");*/
+ fKafkaAdminClient=AdminClient.create ( props );
+ // fKafkaAdminClient = null;
+ }
+
+ //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+ private final AdminClient fKafkaAdminClient;
+
+
+
+ /**
+ * DMaaPKafkaMetaBroker constructor initializing
+ *
+ * @param settings
+ * @param zk
+ * @param configDb
+ */
+ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ //fSettings = settings;
+ fZk = zk;
+ fCambriaConfig = configDb;
+ fBaseTopicData = configDb.parse("/topics");
+ final Properties props = new Properties ();
+ String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");*/
+ fKafkaAdminClient=AdminClient.create ( props );
+ // fKafkaAdminClient = null;
+
+
+ }
+
+ public DMaaPKafkaMetaBroker( rrNvReadable settings,
+ ZkClient zk, ConfigDb configDb,AdminClient client) {
+ //fSettings = settings;
+ fZk = zk;
+ fCambriaConfig = configDb;
+ fBaseTopicData = configDb.parse("/topics");
+ fKafkaAdminClient= client;
+ // fKafkaAdminClient = null;
+
+
+ }
+
+ @Override
+ public List<Topic> getAllTopics() throws ConfigDbException {
+ log.info("Retrieving list of all the topics.");
+ final LinkedList<Topic> result = new LinkedList<Topic>();
+ try {
+ log.info("Retrieving all topics from root: " + zkTopicsRoot);
+ final List<String> topics = fZk.getChildren(zkTopicsRoot);
+ for (String topic : topics) {
+ result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+ }
+ JSONObject dataObj = new JSONObject();
+ dataObj.put("topics", new JSONObject());
+
+ for (String topic : topics) {
+ dataObj.getJSONObject("topics").put(topic, new JSONObject());
+ }
+ } catch (ZkNoNodeException excp) {
+ // very fresh kafka doesn't have any topics or a topics node
+ log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+ }
+ return result;
+ }
+
+ @Override
+ public Topic getTopic(String topic) throws ConfigDbException {
+ if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+ return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+ }
+ // else: no such topic in kafka
+ return null;
+ }
+
+ /**
+ * static method get KafkaTopic object
+ *
+ * @param db
+ * @param base
+ * @param topic
+ * @return
+ * @throws ConfigDbException
+ */
+ public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
+ return new KafkaTopic(topic, db, base);
+ }
+
+ /**
+ * creating topic
+ */
+ @Override
+ public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ log.info("Creating topic: " + topic);
+ try {
+ log.info("Check if topic [" + topic + "] exist.");
+ // first check for existence "our way"
+ final Topic t = getTopic(topic);
+ if (t != null) {
+ log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+ throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
+ }
+ } catch (ConfigDbException e1) {
+ log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
+ throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
+ "Couldn't check topic data in config db.");
+ }
+
+ // we only allow 3 replicas. (If we don't test this, we get weird
+ // results from the cluster,
+ // so explicit test and fail.)
+ if (replicas < 1 || replicas > 3) {
+ log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+ throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
+ "The replica count must be between 1 and 3.");
+ }
+ if (partitions < 1) {
+ log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+ throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
+ }
+
+ // create via kafka
+
+ try
+ {
+ final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+ final KafkaFuture<Void> ctrResult = ctr.all ();
+ ctrResult.get ();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
+ }
+ catch ( InterruptedException e )
+ {
+ //timer.fail ( "Timeout" );
+ log.warn ( "Execution of describeTopics timed out." );
+ throw new ConfigDbException ( e );
+ }
+ catch ( ExecutionException e )
+ {
+ //timer.fail ( "ExecutionError" );
+ log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
+ throw new ConfigDbException ( e.getCause () );
+ }
+
+ }
+
+ @Override
+ public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
+ log.info("Deleting topic: " + topic);
+ ZkClient zkClient = null;
+ try {
+ log.info("Loading zookeeper client for topic deletion.");
+ // topic creation. (Otherwise, the topic is only partially created
+ // in ZK.)
+ /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
+ ZKStringSerializer$.MODULE$);
+ String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
+ if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
+ ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
+ */
+
+ fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
+ log.info("Zookeeper client loaded successfully. Deleting topic.");
+ //AdminUtils.deleteTopic(zkutils, topic);
+ } catch (Exception e) {
+ log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
+ throw new ConfigDbException(e);
+ } finally {
+ log.info("Closing zookeeper connection.");
+ if (zkClient != null)
+ zkClient.close();
+ }
+
+ // throw new UnsupportedOperationException ( "We can't programmatically
+ // delete Kafka topics yet." );
+ }
+
+ //private final rrNvReadable fSettings;
+ private final ZkClient fZk;
+ private final ConfigDb fCambriaConfig;
+ private final ConfigPath fBaseTopicData;
+
+ private static final String zkTopicsRoot = "/brokers/topics";
+ private static final JSONObject kEmptyAcl = new JSONObject();
+
+ /**
+ * method Providing KafkaTopic Object associated with owner and
+ * transactionenabled or not
+ *
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ * @return
+ * @throws ConfigDbException
+ */
+ public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
+ throws ConfigDbException {
+ return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
+ }
+
+ /**
+ * static method giving kafka topic object
+ *
+ * @param db
+ * @param basePath
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ * @return
+ * @throws ConfigDbException
+ */
+ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
+ boolean transactionEnabled) throws ConfigDbException {
+ final JSONObject o = new JSONObject();
+ o.put("owner", owner);
+ o.put("description", desc);
+ o.put("txenabled", transactionEnabled);
+ db.store(basePath.getChild(name), o.toString());
+ return new KafkaTopic(name, db, basePath);
+ }
+
+ /**
+ * class performing all user opearation like user is eligible to read,
+ * write. permitting a user to write and read,
+ *
+ * @author anowarul.islam
+ *
+ */
+ public static class KafkaTopic implements Topic {
+ /**
+ * constructor initializes
+ *
+ * @param name
+ * @param configdb
+ * @param baseTopic
+ * @throws ConfigDbException
+ */
+ public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
+ fName = name;
+ fConfigDb = configdb;
+ fBaseTopicData = baseTopic;
+
+ String data = fConfigDb.load(fBaseTopicData.getChild(fName));
+ if (data == null) {
+ data = "{}";
+ }
+
+ final JSONObject o = new JSONObject(data);
+ fOwner = o.optString("owner", "");
+ fDesc = o.optString("description", "");
+ fTransactionEnabled = o.optBoolean("txenabled", false);// default
+ // value is
+ // false
+ // if this topic has an owner, it needs both read/write ACLs. If there's no
+ // owner (or it's empty), null is okay -- this is for existing or implicitly
+ // created topics.
+ JSONObject readers = o.optJSONObject ( "readers" );
+ if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+ fReaders = fromJson ( readers );
+
+ JSONObject writers = o.optJSONObject ( "writers" );
+ if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+ fWriters = fromJson ( writers );
+ }
+
+ private NsaAcl fromJson(JSONObject o) {
+ NsaAcl acl = new NsaAcl();
+ if (o != null) {
+ JSONArray a = o.optJSONArray("allowed");
+ if (a != null) {
+ for (int i = 0; i < a.length(); ++i) {
+ String user = a.getString(i);
+ acl.add(user);
+ }
+ }
+ }
+ return acl;
+ }
+
+ @Override
+ public String getName() {
+ return fName;
+ }
+
+ @Override
+ public String getOwner() {
+ return fOwner;
+ }
+
+ @Override
+ public String getDescription() {
+ return fDesc;
+ }
+
+ @Override
+ public NsaAcl getReaderAcl() {
+ return fReaders;
+ }
+
+ @Override
+ public NsaAcl getWriterAcl() {
+ return fWriters;
+ }
+
+ @Override
+ public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
+ NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
+ }
+
+ @Override
+ public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
+ NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
+ }
+
+ @Override
+ public void permitWritesFromUser(String pubId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, false, true, pubId);
+ }
+
+ @Override
+ public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, false, false, pubId);
+ }
+
+ @Override
+ public void permitReadsByUser(String consumerId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, true, true, consumerId);
+ }
+
+ @Override
+ public void denyReadsByUser(String consumerId, NsaApiKey asUser)
+ throws ConfigDbException, AccessDeniedException {
+ updateAcl(asUser, true, false, consumerId);
+ }
+
+ private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
+ throws ConfigDbException, AccessDeniedException{
+ try
+ {
+ final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
+
+ // we have to assume we have current data, or load it again. for the expected use
+ // case, assuming we can overwrite the data is fine.
+ final JSONObject o = new JSONObject ();
+ o.put ( "owner", fOwner );
+ o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
+ o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
+ fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
+
+ log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
+
+ }
+ catch ( ConfigDbException x )
+ {
+ throw x;
+ }
+ catch ( AccessDeniedException x )
+ {
+ throw x;
+ }
+
+ }
+
+ private JSONObject safeSerialize(NsaAcl acl) {
+ return acl == null ? null : acl.serialize();
+ }
+
+ private final String fName;
+ private final ConfigDb fConfigDb;
+ private final ConfigPath fBaseTopicData;
+ private final String fOwner;
+ private final String fDesc;
+ private final NsaAcl fReaders;
+ private final NsaAcl fWriters;
+ private boolean fTransactionEnabled;
+
+ public boolean isTransactionEnabled() {
+ return fTransactionEnabled;
+ }
+
+ @Override
+ public Set<String> getOwners() {
+ final TreeSet<String> owners = new TreeSet<String> ();
+ owners.add ( fOwner );
+ return owners;
+ }
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
new file mode 100644
index 0000000..9942837
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
@@ -0,0 +1,231 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.att.dmf.mr.CambriaApiVersionInfo;
+import com.att.dmf.mr.backends.MetricsSet;
+import com.att.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.metrics.impl.CdmConstant;
+import com.att.nsa.metrics.impl.CdmCounter;
+import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
+import com.att.nsa.metrics.impl.CdmMovingAverage;
+import com.att.nsa.metrics.impl.CdmRateTicker;
+import com.att.nsa.metrics.impl.CdmSimpleMetric;
+import com.att.nsa.metrics.impl.CdmStringConstant;
+import com.att.nsa.metrics.impl.CdmTimeSince;
+
+/*@Component("dMaaPMetricsSet")*/
+/**
+ * Metrics related information
+ *
+ * @author anowarul.islam
+ *
+ */
+public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
+
+ private final CdmStringConstant fVersion;
+ private final CdmConstant fStartTime;
+ private final CdmTimeSince fUpTime;
+
+ private final CdmCounter fRecvTotal;
+ private final CdmRateTicker fRecvEpsInstant;
+ private final CdmRateTicker fRecvEpsShort;
+ private final CdmRateTicker fRecvEpsLong;
+
+ private final CdmCounter fSendTotal;
+ private final CdmRateTicker fSendEpsInstant;
+ private final CdmRateTicker fSendEpsShort;
+ private final CdmRateTicker fSendEpsLong;
+
+ private final CdmCounter fKafkaConsumerCacheMiss;
+ private final CdmCounter fKafkaConsumerCacheHit;
+
+ private final CdmCounter fKafkaConsumerClaimed;
+ private final CdmCounter fKafkaConsumerTimeout;
+
+ private final CdmSimpleMetric fFanOutRatio;
+
+ private final HashMap<String, CdmRateTicker> fPathUseRates;
+ private final HashMap<String, CdmMovingAverage> fPathAvgs;
+
+ private rrNvReadable fSettings;
+
+ private final ScheduledExecutorService fScheduler;
+
+ /**
+ * Constructor initialization
+ *
+ * @param cs
+ */
+ //public DMaaPMetricsSet() {
+ public DMaaPMetricsSet(rrNvReadable cs) {
+ //fSettings = cs;
+ fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
+ super.putItem("version", fVersion);
+
+ final long startTime = System.currentTimeMillis();
+ final Date d = new Date(startTime);
+ final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
+ fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
+ super.putItem("startTime", fStartTime);
+
+ fUpTime = new CdmTimeSince("seconds since start");
+ super.putItem("upTime", fUpTime);
+
+ fRecvTotal = new CdmCounter("Total events received since start");
+ super.putItem("recvTotalEvents", fRecvTotal);
+
+ fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
+ super.putItem("recvEpsInstant", fRecvEpsInstant);
+
+ fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
+ super.putItem("recvEpsShort", fRecvEpsShort);
+
+ fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
+ super.putItem("recvEpsLong", fRecvEpsLong);
+
+ fSendTotal = new CdmCounter("Total events sent since start");
+ super.putItem("sendTotalEvents", fSendTotal);
+
+ fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
+ super.putItem("sendEpsInstant", fSendEpsInstant);
+
+ fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
+ super.putItem("sendEpsShort", fSendEpsShort);
+
+ fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
+ super.putItem("sendEpsLong", fSendEpsLong);
+
+ fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
+ super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
+
+ fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
+ super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
+
+ fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
+ super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
+
+ fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
+ super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
+
+ // FIXME: CdmLevel is not exactly a great choice
+ fFanOutRatio = new CdmSimpleMetric() {
+ @Override
+ public String getRawValueString() {
+ return getRawValue().toString();
+ }
+
+ @Override
+ public Number getRawValue() {
+ final double s = fSendTotal.getValue();
+ final double r = fRecvTotal.getValue();
+ return r == 0.0 ? 0.0 : s / r;
+ }
+
+ @Override
+ public String summarize() {
+ return getRawValueString() + " sends per recv";
+ }
+
+ };
+ super.putItem("fanOut", fFanOutRatio);
+
+ // these are added to the metrics catalog as they're discovered
+ fPathUseRates = new HashMap<String, CdmRateTicker>();
+ fPathAvgs = new HashMap<String, CdmMovingAverage>();
+
+ fScheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ @Override
+ public void setupCambriaSender() {
+ DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
+ }
+
+ @Override
+ public void onRouteComplete(String name, long durationMs) {
+ CdmRateTicker ticker = fPathUseRates.get(name);
+ if (ticker == null) {
+ ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
+ fPathUseRates.put(name, ticker);
+ super.putItem("pathUse_" + name, ticker);
+ }
+ ticker.tick();
+
+ CdmMovingAverage durs = fPathAvgs.get(name);
+ if (durs == null) {
+ durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
+ fPathAvgs.put(name, durs);
+ super.putItem("pathDurationMs_" + name, durs);
+ }
+ durs.tick(durationMs);
+ }
+
+ @Override
+ public void publishTick(int amount) {
+ if (amount > 0) {
+ fRecvTotal.bumpBy(amount);
+ fRecvEpsInstant.tick(amount);
+ fRecvEpsShort.tick(amount);
+ fRecvEpsLong.tick(amount);
+ }
+ }
+
+ @Override
+ public void consumeTick(int amount) {
+ if (amount > 0) {
+ fSendTotal.bumpBy(amount);
+ fSendEpsInstant.tick(amount);
+ fSendEpsShort.tick(amount);
+ fSendEpsLong.tick(amount);
+ }
+ }
+
+ @Override
+ public void onKafkaConsumerCacheMiss() {
+ fKafkaConsumerCacheMiss.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerCacheHit() {
+ fKafkaConsumerCacheHit.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerClaimed() {
+ fKafkaConsumerClaimed.bump();
+ }
+
+ @Override
+ public void onKafkaConsumerTimeout() {
+ fKafkaConsumerTimeout.bump();
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
new file mode 100644
index 0000000..e29403f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
@@ -0,0 +1,140 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.security.Key;
+
+//import org.apache.log4-j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.confimpl.EncryptingLayer;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.EncryptingApiDbImpl;
+import com.att.nsa.security.db.NsaApiDb;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import com.att.nsa.util.rrConvertor;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class DMaaPNsaApiDb {
+
+ //private rrNvReadable settings;
+ private DMaaPZkConfigDb cdb;
+
+ //private static final Logger log = Logger
+ // .getLogger(DMaaPNsaApiDb.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
+
+/**
+ *
+ * Constructor initialized
+ * @param settings
+ * @param cdb
+ */
+ @Autowired
+ public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
+ //this.setSettings(settings);
+ this.setCdb(cdb);
+ }
+ /**
+ *
+ * @param settings
+ * @param cdb
+ * @return
+ * @throws ConfigDbException
+ * @throws missingReqdSetting
+ */
+ public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb(
+ rrNvReadable settings, ConfigDb cdb) throws ConfigDbException,
+ missingReqdSetting {
+ // Cambria uses an encrypted api key db
+
+ //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
+ final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
+
+
+ // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
+ final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
+ // if neither value was provided, don't encrypt api key db
+ if (keyBase64 == null && initVectorBase64 == null) {
+ log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
+ return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+ new NsaSimpleApiKeyFactory());
+ } else if (keyBase64 == null) {
+ // neither or both, otherwise something's goofed
+ throw new missingReqdSetting("cambria.secureConfig.key");
+ } else if (initVectorBase64 == null) {
+ // neither or both, otherwise something's goofed
+ throw new missingReqdSetting("cambria.secureConfig.iv");
+ } else {
+ log.info("This server is configured to use an encrypted API key database.");
+ final Key key = EncryptingLayer.readSecretKey(keyBase64);
+ final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
+ return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+ new NsaSimpleApiKeyFactory(), key, iv);
+ }
+ }
+
+ /**
+ * @return
+ * returns settings
+ */
+/* public rrNvReadable getSettings() {
+ return settings;
+ }*/
+
+ /**
+ * @param settings
+ * set settings
+ */
+ /*public void setSettings(rrNvReadable settings) {
+ this.settings = settings;
+ }*/
+
+ /**
+ * @return
+ * returns cbd
+ */
+ public DMaaPZkConfigDb getCdb() {
+ return cdb;
+ }
+ /**
+ * @param cdb
+ * set cdb
+ */
+ public void setCdb(DMaaPZkConfigDb cdb) {
+ this.cdb = cdb;
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java
new file mode 100644
index 0000000..78a7426
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+/**
+ * Created for Zookeeper client which will read configuration and settings parameter
+ * @author nilanjana.maity
+ *
+ */
+public class DMaaPZkClient extends ZkClient {
+
+ /**
+ * This constructor will get the settings value from rrNvReadable
+ * and ConfigurationReader's zookeeper connection
+ * @param settings
+ */
+ public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) {
+ super(ConfigurationReader.getMainZookeeperConnectionString());
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
new file mode 100644
index 0000000..d543721
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.nsa.configs.confimpl.ZkConfigDb;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+//import com.att.nsa.configs.confimpl.ZkConfigDb;
+/**
+ * Provide the zookeeper config db connection
+ * @author nilanjana.maity
+ *
+ */
+public class DMaaPZkConfigDb extends ZkConfigDb {
+ /**
+ * This Constructor will provide the configuration details from the property reader
+ * and DMaaPZkClient
+ * @param zk
+ * @param settings
+ */
+ public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
+ @Qualifier("propertyReader") rrNvReadable settings) {
+
+ //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+ super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
+
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/LogDetails.java b/src/main/java/com/att/dmf/mr/beans/LogDetails.java
new file mode 100644
index 0000000..b7fb325
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/LogDetails.java
@@ -0,0 +1,214 @@
+/**
+ *
+ */
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.util.Date;
+
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.utils.Utils;
+
+/**
+ * @author muzainulhaque.qazi
+ *
+ */
+
+public class LogDetails {
+
+ private String publisherId;
+ private String topicId;
+ private String subscriberGroupId;
+ private String subscriberId;
+ private String publisherIp;
+ private String messageBatchId;
+ private String messageSequence;
+ private String messageTimestamp;
+ private String consumeTimestamp;
+ private String transactionIdTs;
+ private String serverIp;
+
+ private long messageLengthInBytes;
+ private long totalMessageCount;
+
+ private boolean transactionEnabled;
+ /**
+ * This is for transaction enabled logging details
+ *
+ */
+ public LogDetails() {
+ super();
+ }
+
+ public String getTransactionId() {
+ StringBuilder transactionId = new StringBuilder();
+ transactionId.append(transactionIdTs);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(publisherIp);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(messageBatchId);
+ transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
+ transactionId.append(messageSequence);
+
+ return transactionId.toString();
+ }
+
+ public String getPublisherId() {
+ return publisherId;
+ }
+
+ public void setPublisherId(String publisherId) {
+ this.publisherId = publisherId;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public void setTopicId(String topicId) {
+ this.topicId = topicId;
+ }
+
+ public String getSubscriberGroupId() {
+ return subscriberGroupId;
+ }
+
+ public void setSubscriberGroupId(String subscriberGroupId) {
+ this.subscriberGroupId = subscriberGroupId;
+ }
+
+ public String getSubscriberId() {
+ return subscriberId;
+ }
+
+ public void setSubscriberId(String subscriberId) {
+ this.subscriberId = subscriberId;
+ }
+
+ public String getPublisherIp() {
+ return publisherIp;
+ }
+
+ public void setPublisherIp(String publisherIp) {
+ this.publisherIp = publisherIp;
+ }
+
+ public String getMessageBatchId() {
+ return messageBatchId;
+ }
+
+ public void setMessageBatchId(Long messageBatchId) {
+ this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId);
+ }
+
+ public String getMessageSequence() {
+ return messageSequence;
+ }
+
+ public void setMessageSequence(String messageSequence) {
+ this.messageSequence = messageSequence;
+ }
+
+ public String getMessageTimestamp() {
+ return messageTimestamp;
+ }
+
+ public void setMessageTimestamp(String messageTimestamp) {
+ this.messageTimestamp = messageTimestamp;
+ }
+
+ public String getPublishTimestamp() {
+ return Utils.getFormattedDate(new Date());
+ }
+
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
+ }
+
+ public void setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ }
+
+ public long getMessageLengthInBytes() {
+ return messageLengthInBytes;
+ }
+
+ public void setMessageLengthInBytes(long messageLengthInBytes) {
+ this.messageLengthInBytes = messageLengthInBytes;
+ }
+
+ public long getTotalMessageCount() {
+ return totalMessageCount;
+ }
+
+ public void setTotalMessageCount(long totalMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ }
+
+ public boolean isTransactionEnabled() {
+ return transactionEnabled;
+ }
+
+ public void setTransactionEnabled(boolean transactionEnabled) {
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ public String getTransactionIdTs() {
+ return transactionIdTs;
+ }
+
+ public void setTransactionIdTs(String transactionIdTs) {
+ this.transactionIdTs = transactionIdTs;
+ }
+
+ public String getPublisherLogDetails() {
+
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[publisherId=" + publisherId);
+ buffer.append(", topicId=" + topicId);
+ buffer.append(", messageTimestamp=" + messageTimestamp);
+ buffer.append(", publisherIp=" + publisherIp);
+ buffer.append(", messageBatchId=" + messageBatchId);
+ buffer.append(", messageSequence=" + messageSequence );
+ buffer.append(", messageLengthInBytes=" + messageLengthInBytes);
+ buffer.append(", transactionEnabled=" + transactionEnabled);
+ buffer.append(", transactionId=" + getTransactionId());
+ buffer.append(", publishTimestamp=" + getPublishTimestamp());
+ buffer.append(", serverIp=" + getServerIp()+"]");
+ return buffer.toString();
+
+ }
+
+ public String getServerIp() {
+ return serverIp;
+ }
+
+ public void setServerIp(String serverIp) {
+ this.serverIp = serverIp;
+ }
+
+ public void setMessageBatchId(String messageBatchId) {
+ this.messageBatchId = messageBatchId;
+ }
+
+}
diff --git a/src/main/java/com/att/dmf/mr/beans/TopicBean.java b/src/main/java/com/att/dmf/mr/beans/TopicBean.java
new file mode 100644
index 0000000..a397921
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/beans/TopicBean.java
@@ -0,0 +1,155 @@
+/**
+ *
+ */
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.beans;
+
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author muzainulhaque.qazi
+ *
+ */
+@XmlRootElement
+public class TopicBean implements Serializable {
+
+ private static final long serialVersionUID = -8620390377775457949L;
+ private String topicName;
+ private String topicDescription;
+
+ private int partitionCount;
+ private int replicationCount;
+
+ private boolean transactionEnabled;
+
+ /**
+ * constructor
+ */
+ public TopicBean() {
+ super();
+ }
+
+ /**
+ * constructor initialization with topic details name, description,
+ * partition, replication, transaction
+ *
+ * @param topicName
+ * @param description
+ * @param partitionCount
+ * @param replicationCount
+ * @param transactionEnabled
+ */
+ public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount,
+ boolean transactionEnabled) {
+ super();
+ this.topicName = topicName;
+ this.topicDescription = topicDescription;
+ this.partitionCount = partitionCount;
+ this.replicationCount = replicationCount;
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ /**
+ * @return
+ * returns topic name which is of String type
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * @param topicName
+ * set topic name
+ */
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+
+ /**
+ * @return
+ * returns partition count which is of int type
+ */
+ public int getPartitionCount() {
+ return partitionCount;
+ }
+
+ /**
+ * @param partitionCount
+ * set partition Count
+ */
+ public void setPartitionCount(int partitionCount) {
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * @return
+ * returns replication count which is of int type
+ */
+ public int getReplicationCount() {
+ return replicationCount;
+ }
+
+ /**
+ * @param
+ * set replication count which is of int type
+ */
+ public void setReplicationCount(int replicationCount) {
+ this.replicationCount = replicationCount;
+ }
+
+ /**
+ * @return
+ * returns boolean value which indicates whether transaction is Enabled
+ */
+ public boolean isTransactionEnabled() {
+ return transactionEnabled;
+ }
+
+ /**
+ * @param
+ * sets boolean value which indicates whether transaction is Enabled
+ */
+ public void setTransactionEnabled(boolean transactionEnabled) {
+ this.transactionEnabled = transactionEnabled;
+ }
+
+ /**
+ *
+ * @return returns description which is of String type
+ */
+ public String getTopicDescription() {
+ return topicDescription;
+ }
+ /**
+ *
+ * @param topicDescription
+ * set description which is of String type
+ */
+ public void setTopicDescription(String topicDescription) {
+ this.topicDescription = topicDescription;
+ }
+
+}