diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans')
11 files changed, 2219 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java b/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java new file mode 100644 index 0000000..4f0108f --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java @@ -0,0 +1,88 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.io.Serializable; + +import javax.xml.bind.annotation.XmlRootElement; + +import com.att.nsa.drumlin.till.data.uniqueStringGenerator; +/** + * + * @author anowarul.islam + * + */ +@XmlRootElement +public class ApiKeyBean implements Serializable { + + private static final long serialVersionUID = -8219849086890567740L; + + private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + private String email; + private String description; + /** + * constructor + */ + public ApiKeyBean() { + super(); + } +/** + * + * @param email + * @param description + */ + public ApiKeyBean(String email, String description) { + super(); + this.email = email; + this.description = description; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getKey() { + return generateKey(16); + } + + public String getSharedSecret() { + return generateKey(24); + } + + private static String generateKey ( int length ) { + return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length ); + } + +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java new file mode 100644 index 0000000..5f28367 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -0,0 +1,329 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.util.Date; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.exception.DMaaPResponseCode; +import com.att.dmf.mr.exception.ErrorResponse; +import com.att.dmf.mr.utils.Utils; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.metrics.impl.CdmRateTicker; + +/** + * class provide rate information + * + * @author anowarul.islam + * + */ +@Component +public class DMaaPCambriaLimiter { + /** + * constructor initializes + * + * @param settings + * @throws missingReqdSetting + * @throws invalidSettingValue + */ + @Autowired + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) + throws missingReqdSetting, invalidSettingValue { + fRateInfo = new HashMap<String, RateInfo>(); + fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, + CambriaConstants.kDefault_MaxEmptyPollsPerMinute); + fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, + 30); + fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength, + CambriaConstants.kDefault_RateLimitWindowLength); + fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit, + CambriaConstants.kDefault_SleepMsOnRateLimit); + fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit, + 5000); + + } + + /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** + * Construct a rate limiter. + * + * @param maxEmptyPollsPerMinute + * Pass <= 0 to deactivate rate limiting. + * @param windowLengthMins + */ + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) { + this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1)); + } + + /** + * Construct a rate limiter + * + * @param maxEmptyPollsPerMinute + * Pass <= 0 to deactivate rate limiting. + * @param sleepMs + * @param windowLengthMins + */ + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { + fRateInfo = new HashMap<String, RateInfo>(); + fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); + fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); + fWindowLengthMins = windowLengthMins; + fSleepMs = Math.max(0, sleepMs); + fSleepMs1 = Math.max(0, sleepMS1); + } + + /** + * Tell the rate limiter about a call to a topic/group/id. If the rate is + * too high, this call delays its return and throws an exception. + * + * @param topic + * @param consumerGroup + * @param clientId + * @throws CambriaApiException + */ + public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException { + // do nothing if rate is configured 0 or less + if (fMaxEmptyPollsPerMinute <= 0) { + return; + } + // setup rate info for this tuple + final RateInfo ri = getRateInfo(topic, consumerGroup, clientId); + final double rate = ri.onCall(); + log.info(ri.getLabel() + ": " + rate + " empty replies/minute."); + if (rate > fMaxEmptyPollsPerMinute) { + try { + log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute + + "."); + if (fSleepMs > 0) { + log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs + + " ms sleep, then responding in error."); + Thread.sleep(fSleepMs); + + } else { + log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); + } + } catch (InterruptedException e) { + // ignore + } + + + ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "This client is making too many requests. Please use a long poll " + + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); + + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + /*if (fMaxPollsPerMinute <= 0) { + return; + } + + final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); + final double ratevalue = ric.onCall(); + if (ratevalue > fMaxPollsPerMinute) { + try { + log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute + + "."); + if (fSleepMs1 > fMaxPollsPerMinute) { + log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs + + " ms sleep, then responding in error."); + Thread.sleep(fSleepMs1); + ric.reset(); + } else { + log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); + } + } catch (InterruptedException e) { + // ignore + } + + + ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "This client is making too many requests " + + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); + + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + }*/ + + } + + /** + * + * @param topic + * @param consumerGroup + * @param clientId + * @param sentCount + */ + public void onSend(String topic, String consumerGroup, String clientId, long sentCount) { + // check for good replies + if (sentCount > 0) { + // that was a good send, reset the metric + getRateInfo(topic, consumerGroup, clientId).reset(); + } + } + + private static class RateInfo { + /** + * constructor initialzes + * + * @param label + * @param windowLengthMinutes + */ + public RateInfo(String label, int windowLengthMinutes) { + fLabel = label; + fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, + windowLengthMinutes, TimeUnit.MINUTES); + } + + public String getLabel() { + return fLabel; + } + + /** + * CdmRateTicker is reset + */ + public void reset() { + fCallRateSinceLastMsgSend.reset(); + } + + /** + * + * @return + */ + public double onCall() { + fCallRateSinceLastMsgSend.tick(); + return fCallRateSinceLastMsgSend.getRate(); + } + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; + } + + + + private static class RateInfoCheck { + /** + * constructor initialzes + * + * @param label + * @param windowLengthMinutes + */ + public RateInfoCheck(String label, int windowLengthMinutes) { + fLabel = label; + fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, + windowLengthMinutes, TimeUnit.MINUTES); + } + + public String getLabel() { + return fLabel; + } + + /** + * CdmRateTicker is reset + */ + public void reset() { + fCallRateSinceLastMsgSend.reset(); + } + + /** + * + * @return + */ + public double onCall() { + fCallRateSinceLastMsgSend.tick(); + return fCallRateSinceLastMsgSend.getRate(); + } + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; + } + + + private final HashMap<String, RateInfo> fRateInfo; + private final HashMap<String, RateInfoCheck> fRateInfoCheck; + private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + private final long fSleepMs1; + //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { + final String key = makeKey(topic, consumerGroup, clientId); + RateInfo ri = fRateInfo.get(key); + if (ri == null) { + ri = new RateInfo(key, fWindowLengthMins); + fRateInfo.put(key, ri); + } + return ri; + } + + + private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { + final String key = makeKey(topic, consumerGroup, clientId); + RateInfoCheck ri = fRateInfoCheck.get(key); + if (ri == null) { + ri = new RateInfoCheck(key, 1); + fRateInfoCheck.put(key, ri); + } + return ri; + } + + + + + private String makeKey(String topic, String group, String id) { + return topic + "::" + group + "::" + id; + } +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java b/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java new file mode 100644 index 0000000..a880877 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; + +import com.att.dmf.mr.utils.ConfigurationReader; + +/** + * DMaaPContext provide and maintain all the configurations , Http request/response + * Session and consumer Request Time + * @author nilanjana.maity + * + */ +public class DMaaPContext { + + private ConfigurationReader configReader; + private HttpServletRequest request; + private HttpServletResponse response; + private HttpSession session; + private String consumerRequestTime; + static int i=0; + + public synchronized static long getBatchID() { + try{ + final long metricsSendTime = System.currentTimeMillis(); + final Date d = new Date(metricsSendTime); + final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d); + long dt= Long.valueOf(text)+i; + i++; + return dt; + } + catch(NumberFormatException ex){ + return 0; + } + } + + public HttpServletRequest getRequest() { + return request; + } + + public void setRequest(HttpServletRequest request) { + this.request = request; + } + + public HttpServletResponse getResponse() { + return response; + } + + public void setResponse(HttpServletResponse response) { + this.response = response; + } + + public HttpSession getSession() { + this.session = request.getSession(); + return session; + } + + public void setSession(HttpSession session) { + this.session = session; + } + + public ConfigurationReader getConfigReader() { + return configReader; + } + + public void setConfigReader(ConfigurationReader configReader) { + this.configReader = configReader; + } + + public String getConsumerRequestTime() { + return consumerRequestTime; + } + + public void setConsumerRequestTime(String consumerRequestTime) { + this.consumerRequestTime = consumerRequestTime; + } + + +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java new file mode 100644 index 0000000..6fc0838 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -0,0 +1,365 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.backends.ConsumerFactory; +import com.att.dmf.mr.backends.MetricsSet; +import com.att.dmf.mr.backends.kafka.Kafka011Consumer; +import com.att.dmf.mr.backends.kafka.Kafka011ConsumerUtil; +import com.att.dmf.mr.backends.kafka.KafkaConsumerCache; +import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; +import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; +import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.utils.ConfigurationReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; + +/** + * @author nilanjana.maity + * + */ +public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + + // private static final Logger log = LoggerFactory + // .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); + // @Autowired + // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new + // KafkaLiveLockAvoider(); + + /** + * constructor initialization + * + * @param settings + * @param metrics + * @param curator + * @throws missingReqdSetting + * @throws KafkaConsumerCacheException + * @throws UnknownHostException + */ + + public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics, + @Qualifier("curator") CuratorFramework curator, + @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) + throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException { + + String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + CambriaConstants.kSetting_ApiNodeIdentifier); + if (apiNodeId == null) { + + apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; + } + + log.info("This Cambria API Node identifies itself as [" + apiNodeId + "]."); + final String mode = CambriaConstants.DMAAP; + + fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + boolean kSetting_EnableCache = kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "cambria.consumer.cache.enabled"); + if (null != strkSetting_EnableCache) + kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); + + final boolean isCacheEnabled = kSetting_EnableCache; + + // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, + // metrics) : null; + fCache = null; + if (isCacheEnabled) { + fCache = KafkaConsumerCache.getInstance(); + + } + if (fCache != null) { + fCache.setfMetrics(metrics); + fCache.setfApiId(apiNodeId); + fCache.startCache(mode, curator); + if(kafkaLiveLockAvoider!=null){ + kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); + fkafkaLiveLockAvoider = kafkaLiveLockAvoider; + } + } + } + + /* + * getConsumerFor + * + * @see + * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String, + * java.lang.String, java.lang.String, int, java.lang.String) This method is + * used by EventServiceImpl.getEvents() method to get a Kakfa consumer + * either from kafkaconsumer cache or create a new connection This also get + * the list of other consumer objects for the same consumer group and set to + * KafkaConsumer object. This list may be used during poll-rebalancing + * issue. + */ + @Override + public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, + String remotehost) throws UnavailableException, CambriaApiException { + Kafka011Consumer kc; + + // To synchronize based on the consumer group. + + Object syncObject = synchash.get(topic + consumerGroupName); + if (null == syncObject) { + syncObject = new Object(); + synchash.put(topic + consumerGroupName, syncObject); + } + + synchronized (syncObject) { + try { + kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId + + } catch (KafkaConsumerCacheException e) { + log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName + + "::" + consumerId); + log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName + + "::" + consumerId); + throw new UnavailableException(e); + } + + // Ideally if cache exists below flow should be skipped. If cache + // didnt + // exist, then create this first time on this node. + if (kc == null) { + + log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>" + + kc); + + final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(), + "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); + boolean locked = false; + + try { + + locked = ipLock.acquire(30, TimeUnit.SECONDS); + if (!locked) { + + log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost); + throw new UnavailableException( + "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost); + } + + // ConfigurationReader.getCurator().checkExists().forPath("S"). + + log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + + "], on topic [" + topic + "]."); + + fCache.signalOwnership(topic, consumerGroupName, consumerId); + + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); + long fCreateTimeMs = System.currentTimeMillis(); + KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj() + // ); + log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + + if (fCache != null) { + fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); // + } + + } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { + log.info( + "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId); + throw new UnavailableException("Couldn't connect to ZK."); + } catch (KafkaConsumerCacheException e) { + log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage() + + " " + consumerGroupName + "/" + consumerId); + } catch (UnavailableException u) { + log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName + + "/" + consumerId); + throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u); + } catch (Exception e) { + log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + + consumerId); + log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + + consumerId); + + } finally { + if (locked) { + try { + ipLock.release(); + } catch (Exception e) { + throw new UnavailableException("Error while releasing consumer factory lock" + e, e); + } + } + } + } + } + return kc; + } + + @Override + public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) { + if (fCache != null) { + fCache.dropConsumer(topic, consumerGroup, clientId); + } + } + + @Override + public synchronized Collection<? extends Consumer> getConsumers() { + return fCache.getConsumers(); + } + + @Override + public synchronized void dropCache() { + fCache.dropAllConsumers(); + } + + + private KafkaConsumerCache fCache; + private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider; + private String fkafkaBrokers; + + + + private static String makeLongKey(String key, String prefix) { + return prefix + "." + key; + } + + private void transferSettingIfProvided(Properties target, String key, String prefix) { + String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); + + // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null != keyVal) { + // final String val = fSettings + // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); + target.put(key, keyVal); + } + } + + /** + * Name CreateConsumerconfig + * @param topic + * @param groupId + * @param consumerId + * @return Properties + * + * This method is to create Properties required to create kafka connection + * Group name is replaced with different format groupid--topic to address same + * groupids for multiple topics. Same groupid with multiple topics + * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique + */ + private Properties createConsumerConfig(String topic ,String groupId, String consumerId) { + final Properties props = new Properties(); + //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic + //Fix for CPFMF-644 : + final String fakeGroupName = groupId + "--" + topic; + props.put("group.id", fakeGroupName); + props.put("enable.auto.commit", "false"); // 0.11 + props.put("bootstrap.servers", fkafkaBrokers); + /*props.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN");*/ + props.put("client.id", consumerId); + + // additional settings: start with our defaults, then pull in configured + // overrides + populateKafkaInternalDefaultsMap(); + for (String key : KafkaConsumerKeys) { + transferSettingIfProvided(props, key, "kafka"); + } + + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + return props; + } + + + private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms", + "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes", + "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level", + "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms", + "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms", + "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" }; + + /** + * putting values in hashmap like consumer timeout, zookeeper time out, etc + * + * @param setting + */ + private static void populateKafkaInternalDefaultsMap() { } + + /* + * The starterIncremnt value is just to emulate calling certain consumers, + * in this test app all the consumers are local + * + */ + private LiveLockAvoidance makeAvoidanceCallback(final String appId) { + + return new LiveLockAvoidance() { + + @Override + public String getAppId() { + return appId; + } + + @Override + public void handleRebalanceUnlock(String groupName) { + log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName); + Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::"); + } + + }; + + } + + @SuppressWarnings("rawtypes") + @Override + public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, + String remotehost) throws UnavailableException, CambriaApiException { + // TODO Auto-generated method stub + return null; + } + + private HashMap<String, Object> synchash = new HashMap<String, Object>(); + +}
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java new file mode 100644 index 0000000..643eae9 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -0,0 +1,497 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; +import org.json.JSONObject; +import org.json.JSONArray; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.metabroker.Broker; +import com.att.dmf.mr.metabroker.Broker1; +import com.att.dmf.mr.metabroker.Topic; +import com.att.dmf.mr.utils.ConfigurationReader; +//import org.apache.log4-j.Logger; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.ConfigPath; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaAclUtils; +import com.att.nsa.security.NsaApiKey; + + +/** + * class performing all topic operations + * + * @author anowarul.islam + * + */ +//@Component +public class DMaaPKafkaMetaBroker implements Broker1 { + + public DMaaPKafkaMetaBroker() { + fZk = null; + fCambriaConfig = null; + fBaseTopicData = null; + final Properties props = new Properties (); + String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); + // fKafkaAdminClient = null; + } + + //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); + private final AdminClient fKafkaAdminClient; + + + + /** + * DMaaPKafkaMetaBroker constructor initializing + * + * @param settings + * @param zk + * @param configDb + */ + public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + //fSettings = settings; + fZk = zk; + fCambriaConfig = configDb; + fBaseTopicData = configDb.parse("/topics"); + final Properties props = new Properties (); + String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); + // fKafkaAdminClient = null; + + + } + + public DMaaPKafkaMetaBroker( rrNvReadable settings, + ZkClient zk, ConfigDb configDb,AdminClient client) { + //fSettings = settings; + fZk = zk; + fCambriaConfig = configDb; + fBaseTopicData = configDb.parse("/topics"); + fKafkaAdminClient= client; + // fKafkaAdminClient = null; + + + } + + @Override + public List<Topic> getAllTopics() throws ConfigDbException { + log.info("Retrieving list of all the topics."); + final LinkedList<Topic> result = new LinkedList<Topic>(); + try { + log.info("Retrieving all topics from root: " + zkTopicsRoot); + final List<String> topics = fZk.getChildren(zkTopicsRoot); + for (String topic : topics) { + result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); + } + JSONObject dataObj = new JSONObject(); + dataObj.put("topics", new JSONObject()); + + for (String topic : topics) { + dataObj.getJSONObject("topics").put(topic, new JSONObject()); + } + } catch (ZkNoNodeException excp) { + // very fresh kafka doesn't have any topics or a topics node + log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); + } + return result; + } + + @Override + public Topic getTopic(String topic) throws ConfigDbException { + if (fZk.exists(zkTopicsRoot + "/" + topic)) { + return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); + } + // else: no such topic in kafka + return null; + } + + /** + * static method get KafkaTopic object + * + * @param db + * @param base + * @param topic + * @return + * @throws ConfigDbException + */ + public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException { + return new KafkaTopic(topic, db, base); + } + + /** + * creating topic + */ + @Override + public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + log.info("Creating topic: " + topic); + try { + log.info("Check if topic [" + topic + "] exist."); + // first check for existence "our way" + final Topic t = getTopic(topic); + if (t != null) { + log.info("Could not create topic [" + topic + "]. Topic Already exists."); + throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); + } + } catch (ConfigDbException e1) { + log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "Couldn't check topic data in config db."); + } + + // we only allow 3 replicas. (If we don't test this, we get weird + // results from the cluster, + // so explicit test and fail.) + if (replicas < 1 || replicas > 3) { + log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); + throw new CambriaApiException(HttpStatusCodes.k400_badRequest, + "The replica count must be between 1 and 3."); + } + if (partitions < 1) { + log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); + throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); + } + + // create via kafka + + try + { + final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () ); + final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); + final KafkaFuture<Void> ctrResult = ctr.all (); + ctrResult.get (); + // underlying Kafka topic created. now setup our API info + return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); + } + catch ( InterruptedException e ) + { + //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); + throw new ConfigDbException ( e ); + } + catch ( ExecutionException e ) + { + //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); + throw new ConfigDbException ( e.getCause () ); + } + + } + + @Override + public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { + log.info("Deleting topic: " + topic); + ZkClient zkClient = null; + try { + log.info("Loading zookeeper client for topic deletion."); + // topic creation. (Otherwise, the topic is only partially created + // in ZK.) + /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, + ZKStringSerializer$.MODULE$); + String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); + if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; + ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); + */ + + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); + log.info("Zookeeper client loaded successfully. Deleting topic."); + //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { + log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); + throw new ConfigDbException(e); + } finally { + log.info("Closing zookeeper connection."); + if (zkClient != null) + zkClient.close(); + } + + // throw new UnsupportedOperationException ( "We can't programmatically + // delete Kafka topics yet." ); + } + + //private final rrNvReadable fSettings; + private final ZkClient fZk; + private final ConfigDb fCambriaConfig; + private final ConfigPath fBaseTopicData; + + private static final String zkTopicsRoot = "/brokers/topics"; + private static final JSONObject kEmptyAcl = new JSONObject(); + + /** + * method Providing KafkaTopic Object associated with owner and + * transactionenabled or not + * + * @param name + * @param desc + * @param owner + * @param transactionEnabled + * @return + * @throws ConfigDbException + */ + public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) + throws ConfigDbException { + return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); + } + + /** + * static method giving kafka topic object + * + * @param db + * @param basePath + * @param name + * @param desc + * @param owner + * @param transactionEnabled + * @return + * @throws ConfigDbException + */ + public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, + boolean transactionEnabled) throws ConfigDbException { + final JSONObject o = new JSONObject(); + o.put("owner", owner); + o.put("description", desc); + o.put("txenabled", transactionEnabled); + db.store(basePath.getChild(name), o.toString()); + return new KafkaTopic(name, db, basePath); + } + + /** + * class performing all user opearation like user is eligible to read, + * write. permitting a user to write and read, + * + * @author anowarul.islam + * + */ + public static class KafkaTopic implements Topic { + /** + * constructor initializes + * + * @param name + * @param configdb + * @param baseTopic + * @throws ConfigDbException + */ + public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { + fName = name; + fConfigDb = configdb; + fBaseTopicData = baseTopic; + + String data = fConfigDb.load(fBaseTopicData.getChild(fName)); + if (data == null) { + data = "{}"; + } + + final JSONObject o = new JSONObject(data); + fOwner = o.optString("owner", ""); + fDesc = o.optString("description", ""); + fTransactionEnabled = o.optBoolean("txenabled", false);// default + // value is + // false + // if this topic has an owner, it needs both read/write ACLs. If there's no + // owner (or it's empty), null is okay -- this is for existing or implicitly + // created topics. + JSONObject readers = o.optJSONObject ( "readers" ); + if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + fReaders = fromJson ( readers ); + + JSONObject writers = o.optJSONObject ( "writers" ); + if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + fWriters = fromJson ( writers ); + } + + private NsaAcl fromJson(JSONObject o) { + NsaAcl acl = new NsaAcl(); + if (o != null) { + JSONArray a = o.optJSONArray("allowed"); + if (a != null) { + for (int i = 0; i < a.length(); ++i) { + String user = a.getString(i); + acl.add(user); + } + } + } + return acl; + } + + @Override + public String getName() { + return fName; + } + + @Override + public String getOwner() { + return fOwner; + } + + @Override + public String getDescription() { + return fDesc; + } + + @Override + public NsaAcl getReaderAcl() { + return fReaders; + } + + @Override + public NsaAcl getWriterAcl() { + return fWriters; + } + + @Override + public void checkUserRead(NsaApiKey user) throws AccessDeniedException { + NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user ); + } + + @Override + public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { + NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user ); + } + + @Override + public void permitWritesFromUser(String pubId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, false, true, pubId); + } + + @Override + public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, false, false, pubId); + } + + @Override + public void permitReadsByUser(String consumerId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, true, true, consumerId); + } + + @Override + public void denyReadsByUser(String consumerId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, true, false, consumerId); + } + + private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) + throws ConfigDbException, AccessDeniedException{ + try + { + final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); + + // we have to assume we have current data, or load it again. for the expected use + // case, assuming we can overwrite the data is fine. + final JSONObject o = new JSONObject (); + o.put ( "owner", fOwner ); + o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) ); + o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) ); + fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); + + log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); + + } + catch ( ConfigDbException x ) + { + throw x; + } + catch ( AccessDeniedException x ) + { + throw x; + } + + } + + private JSONObject safeSerialize(NsaAcl acl) { + return acl == null ? null : acl.serialize(); + } + + private final String fName; + private final ConfigDb fConfigDb; + private final ConfigPath fBaseTopicData; + private final String fOwner; + private final String fDesc; + private final NsaAcl fReaders; + private final NsaAcl fWriters; + private boolean fTransactionEnabled; + + public boolean isTransactionEnabled() { + return fTransactionEnabled; + } + + @Override + public Set<String> getOwners() { + final TreeSet<String> owners = new TreeSet<String> (); + owners.add ( fOwner ); + return owners; + } + } + +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java new file mode 100644 index 0000000..9942837 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java @@ -0,0 +1,231 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.att.dmf.mr.CambriaApiVersionInfo; +import com.att.dmf.mr.backends.MetricsSet; +import com.att.mr.apiServer.metrics.cambria.DMaaPMetricsSender; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.metrics.impl.CdmConstant; +import com.att.nsa.metrics.impl.CdmCounter; +import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl; +import com.att.nsa.metrics.impl.CdmMovingAverage; +import com.att.nsa.metrics.impl.CdmRateTicker; +import com.att.nsa.metrics.impl.CdmSimpleMetric; +import com.att.nsa.metrics.impl.CdmStringConstant; +import com.att.nsa.metrics.impl.CdmTimeSince; + +/*@Component("dMaaPMetricsSet")*/ +/** + * Metrics related information + * + * @author anowarul.islam + * + */ +public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet { + + private final CdmStringConstant fVersion; + private final CdmConstant fStartTime; + private final CdmTimeSince fUpTime; + + private final CdmCounter fRecvTotal; + private final CdmRateTicker fRecvEpsInstant; + private final CdmRateTicker fRecvEpsShort; + private final CdmRateTicker fRecvEpsLong; + + private final CdmCounter fSendTotal; + private final CdmRateTicker fSendEpsInstant; + private final CdmRateTicker fSendEpsShort; + private final CdmRateTicker fSendEpsLong; + + private final CdmCounter fKafkaConsumerCacheMiss; + private final CdmCounter fKafkaConsumerCacheHit; + + private final CdmCounter fKafkaConsumerClaimed; + private final CdmCounter fKafkaConsumerTimeout; + + private final CdmSimpleMetric fFanOutRatio; + + private final HashMap<String, CdmRateTicker> fPathUseRates; + private final HashMap<String, CdmMovingAverage> fPathAvgs; + + private rrNvReadable fSettings; + + private final ScheduledExecutorService fScheduler; + + /** + * Constructor initialization + * + * @param cs + */ + //public DMaaPMetricsSet() { + public DMaaPMetricsSet(rrNvReadable cs) { + //fSettings = cs; + fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); + super.putItem("version", fVersion); + + final long startTime = System.currentTimeMillis(); + final Date d = new Date(startTime); + final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d); + fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text); + super.putItem("startTime", fStartTime); + + fUpTime = new CdmTimeSince("seconds since start"); + super.putItem("upTime", fUpTime); + + fRecvTotal = new CdmCounter("Total events received since start"); + super.putItem("recvTotalEvents", fRecvTotal); + + fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + super.putItem("recvEpsInstant", fRecvEpsInstant); + + fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); + super.putItem("recvEpsShort", fRecvEpsShort); + + fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); + super.putItem("recvEpsLong", fRecvEpsLong); + + fSendTotal = new CdmCounter("Total events sent since start"); + super.putItem("sendTotalEvents", fSendTotal); + + fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + super.putItem("sendEpsInstant", fSendEpsInstant); + + fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); + super.putItem("sendEpsShort", fSendEpsShort); + + fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); + super.putItem("sendEpsLong", fSendEpsLong); + + fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses"); + super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss); + + fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits"); + super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit); + + fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed"); + super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed); + + fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout"); + super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout); + + // FIXME: CdmLevel is not exactly a great choice + fFanOutRatio = new CdmSimpleMetric() { + @Override + public String getRawValueString() { + return getRawValue().toString(); + } + + @Override + public Number getRawValue() { + final double s = fSendTotal.getValue(); + final double r = fRecvTotal.getValue(); + return r == 0.0 ? 0.0 : s / r; + } + + @Override + public String summarize() { + return getRawValueString() + " sends per recv"; + } + + }; + super.putItem("fanOut", fFanOutRatio); + + // these are added to the metrics catalog as they're discovered + fPathUseRates = new HashMap<String, CdmRateTicker>(); + fPathAvgs = new HashMap<String, CdmMovingAverage>(); + + fScheduler = Executors.newScheduledThreadPool(1); + } + + @Override + public void setupCambriaSender() { + DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap"); + } + + @Override + public void onRouteComplete(String name, long durationMs) { + CdmRateTicker ticker = fPathUseRates.get(name); + if (ticker == null) { + ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS); + fPathUseRates.put(name, ticker); + super.putItem("pathUse_" + name, ticker); + } + ticker.tick(); + + CdmMovingAverage durs = fPathAvgs.get(name); + if (durs == null) { + durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES); + fPathAvgs.put(name, durs); + super.putItem("pathDurationMs_" + name, durs); + } + durs.tick(durationMs); + } + + @Override + public void publishTick(int amount) { + if (amount > 0) { + fRecvTotal.bumpBy(amount); + fRecvEpsInstant.tick(amount); + fRecvEpsShort.tick(amount); + fRecvEpsLong.tick(amount); + } + } + + @Override + public void consumeTick(int amount) { + if (amount > 0) { + fSendTotal.bumpBy(amount); + fSendEpsInstant.tick(amount); + fSendEpsShort.tick(amount); + fSendEpsLong.tick(amount); + } + } + + @Override + public void onKafkaConsumerCacheMiss() { + fKafkaConsumerCacheMiss.bump(); + } + + @Override + public void onKafkaConsumerCacheHit() { + fKafkaConsumerCacheHit.bump(); + } + + @Override + public void onKafkaConsumerClaimed() { + fKafkaConsumerClaimed.bump(); + } + + @Override + public void onKafkaConsumerTimeout() { + fKafkaConsumerTimeout.bump(); + } + +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java new file mode 100644 index 0000000..e29403f --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java @@ -0,0 +1,140 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.security.Key; + +//import org.apache.log4-j.Logger; +import org.springframework.beans.factory.annotation.Autowired; + +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.confimpl.EncryptingLayer; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.db.BaseNsaApiDbImpl; +import com.att.nsa.security.db.EncryptingApiDbImpl; +import com.att.nsa.security.db.NsaApiDb; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; +import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; +import com.att.nsa.util.rrConvertor; + +/** + * + * @author anowarul.islam + * + */ +public class DMaaPNsaApiDb { + + //private rrNvReadable settings; + private DMaaPZkConfigDb cdb; + + //private static final Logger log = Logger + // .getLogger(DMaaPNsaApiDb.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); + +/** + * + * Constructor initialized + * @param settings + * @param cdb + */ + @Autowired + public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { + //this.setSettings(settings); + this.setCdb(cdb); + } + /** + * + * @param settings + * @param cdb + * @return + * @throws ConfigDbException + * @throws missingReqdSetting + */ + public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb( + rrNvReadable settings, ConfigDb cdb) throws ConfigDbException, + missingReqdSetting { + // Cambria uses an encrypted api key db + + //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); + final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); + + + // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); + final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); + // if neither value was provided, don't encrypt api key db + if (keyBase64 == null && initVectorBase64 == null) { + log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); + return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb, + new NsaSimpleApiKeyFactory()); + } else if (keyBase64 == null) { + // neither or both, otherwise something's goofed + throw new missingReqdSetting("cambria.secureConfig.key"); + } else if (initVectorBase64 == null) { + // neither or both, otherwise something's goofed + throw new missingReqdSetting("cambria.secureConfig.iv"); + } else { + log.info("This server is configured to use an encrypted API key database."); + final Key key = EncryptingLayer.readSecretKey(keyBase64); + final byte[] iv = rrConvertor.base64Decode(initVectorBase64); + return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb, + new NsaSimpleApiKeyFactory(), key, iv); + } + } + + /** + * @return + * returns settings + */ +/* public rrNvReadable getSettings() { + return settings; + }*/ + + /** + * @param settings + * set settings + */ + /*public void setSettings(rrNvReadable settings) { + this.settings = settings; + }*/ + + /** + * @return + * returns cbd + */ + public DMaaPZkConfigDb getCdb() { + return cdb; + } + /** + * @param cdb + * set cdb + */ + public void setCdb(DMaaPZkConfigDb cdb) { + this.cdb = cdb; + } + + +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java new file mode 100644 index 0000000..78a7426 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import org.I0Itec.zkclient.ZkClient; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.dmf.mr.utils.ConfigurationReader; +import com.att.nsa.drumlin.till.nv.rrNvReadable; + +/** + * Created for Zookeeper client which will read configuration and settings parameter + * @author nilanjana.maity + * + */ +public class DMaaPZkClient extends ZkClient { + + /** + * This constructor will get the settings value from rrNvReadable + * and ConfigurationReader's zookeeper connection + * @param settings + */ + public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) { + super(ConfigurationReader.getMainZookeeperConnectionString()); + } +} diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java new file mode 100644 index 0000000..d543721 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java @@ -0,0 +1,51 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.dmf.mr.utils.ConfigurationReader; +import com.att.nsa.configs.confimpl.ZkConfigDb; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +//import com.att.nsa.configs.confimpl.ZkConfigDb; +/** + * Provide the zookeeper config db connection + * @author nilanjana.maity + * + */ +public class DMaaPZkConfigDb extends ZkConfigDb { + /** + * This Constructor will provide the configuration details from the property reader + * and DMaaPZkClient + * @param zk + * @param settings + */ + public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, + @Qualifier("propertyReader") rrNvReadable settings) { + + //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); + super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); + + } + + +} diff --git a/src/main/java/com/att/dmf/mr/beans/LogDetails.java b/src/main/java/com/att/dmf/mr/beans/LogDetails.java new file mode 100644 index 0000000..b7fb325 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/LogDetails.java @@ -0,0 +1,214 @@ +/** + * + */ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.util.Date; + +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.utils.Utils; + +/** + * @author muzainulhaque.qazi + * + */ + +public class LogDetails { + + private String publisherId; + private String topicId; + private String subscriberGroupId; + private String subscriberId; + private String publisherIp; + private String messageBatchId; + private String messageSequence; + private String messageTimestamp; + private String consumeTimestamp; + private String transactionIdTs; + private String serverIp; + + private long messageLengthInBytes; + private long totalMessageCount; + + private boolean transactionEnabled; + /** + * This is for transaction enabled logging details + * + */ + public LogDetails() { + super(); + } + + public String getTransactionId() { + StringBuilder transactionId = new StringBuilder(); + transactionId.append(transactionIdTs); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(publisherIp); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(messageBatchId); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(messageSequence); + + return transactionId.toString(); + } + + public String getPublisherId() { + return publisherId; + } + + public void setPublisherId(String publisherId) { + this.publisherId = publisherId; + } + + public String getTopicId() { + return topicId; + } + + public void setTopicId(String topicId) { + this.topicId = topicId; + } + + public String getSubscriberGroupId() { + return subscriberGroupId; + } + + public void setSubscriberGroupId(String subscriberGroupId) { + this.subscriberGroupId = subscriberGroupId; + } + + public String getSubscriberId() { + return subscriberId; + } + + public void setSubscriberId(String subscriberId) { + this.subscriberId = subscriberId; + } + + public String getPublisherIp() { + return publisherIp; + } + + public void setPublisherIp(String publisherIp) { + this.publisherIp = publisherIp; + } + + public String getMessageBatchId() { + return messageBatchId; + } + + public void setMessageBatchId(Long messageBatchId) { + this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId); + } + + public String getMessageSequence() { + return messageSequence; + } + + public void setMessageSequence(String messageSequence) { + this.messageSequence = messageSequence; + } + + public String getMessageTimestamp() { + return messageTimestamp; + } + + public void setMessageTimestamp(String messageTimestamp) { + this.messageTimestamp = messageTimestamp; + } + + public String getPublishTimestamp() { + return Utils.getFormattedDate(new Date()); + } + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + public long getMessageLengthInBytes() { + return messageLengthInBytes; + } + + public void setMessageLengthInBytes(long messageLengthInBytes) { + this.messageLengthInBytes = messageLengthInBytes; + } + + public long getTotalMessageCount() { + return totalMessageCount; + } + + public void setTotalMessageCount(long totalMessageCount) { + this.totalMessageCount = totalMessageCount; + } + + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + public String getTransactionIdTs() { + return transactionIdTs; + } + + public void setTransactionIdTs(String transactionIdTs) { + this.transactionIdTs = transactionIdTs; + } + + public String getPublisherLogDetails() { + + StringBuilder buffer = new StringBuilder(); + buffer.append("[publisherId=" + publisherId); + buffer.append(", topicId=" + topicId); + buffer.append(", messageTimestamp=" + messageTimestamp); + buffer.append(", publisherIp=" + publisherIp); + buffer.append(", messageBatchId=" + messageBatchId); + buffer.append(", messageSequence=" + messageSequence ); + buffer.append(", messageLengthInBytes=" + messageLengthInBytes); + buffer.append(", transactionEnabled=" + transactionEnabled); + buffer.append(", transactionId=" + getTransactionId()); + buffer.append(", publishTimestamp=" + getPublishTimestamp()); + buffer.append(", serverIp=" + getServerIp()+"]"); + return buffer.toString(); + + } + + public String getServerIp() { + return serverIp; + } + + public void setServerIp(String serverIp) { + this.serverIp = serverIp; + } + + public void setMessageBatchId(String messageBatchId) { + this.messageBatchId = messageBatchId; + } + +} diff --git a/src/main/java/com/att/dmf/mr/beans/TopicBean.java b/src/main/java/com/att/dmf/mr/beans/TopicBean.java new file mode 100644 index 0000000..a397921 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/beans/TopicBean.java @@ -0,0 +1,155 @@ +/** + * + */ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.beans; + +import java.io.Serializable; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author muzainulhaque.qazi + * + */ +@XmlRootElement +public class TopicBean implements Serializable { + + private static final long serialVersionUID = -8620390377775457949L; + private String topicName; + private String topicDescription; + + private int partitionCount; + private int replicationCount; + + private boolean transactionEnabled; + + /** + * constructor + */ + public TopicBean() { + super(); + } + + /** + * constructor initialization with topic details name, description, + * partition, replication, transaction + * + * @param topicName + * @param description + * @param partitionCount + * @param replicationCount + * @param transactionEnabled + */ + public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount, + boolean transactionEnabled) { + super(); + this.topicName = topicName; + this.topicDescription = topicDescription; + this.partitionCount = partitionCount; + this.replicationCount = replicationCount; + this.transactionEnabled = transactionEnabled; + } + + /** + * @return + * returns topic name which is of String type + */ + public String getTopicName() { + return topicName; + } + + /** + * @param topicName + * set topic name + */ + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + + /** + * @return + * returns partition count which is of int type + */ + public int getPartitionCount() { + return partitionCount; + } + + /** + * @param partitionCount + * set partition Count + */ + public void setPartitionCount(int partitionCount) { + this.partitionCount = partitionCount; + } + + /** + * @return + * returns replication count which is of int type + */ + public int getReplicationCount() { + return replicationCount; + } + + /** + * @param + * set replication count which is of int type + */ + public void setReplicationCount(int replicationCount) { + this.replicationCount = replicationCount; + } + + /** + * @return + * returns boolean value which indicates whether transaction is Enabled + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * @param + * sets boolean value which indicates whether transaction is Enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + /** + * + * @return returns description which is of String type + */ + public String getTopicDescription() { + return topicDescription; + } + /** + * + * @param topicDescription + * set description which is of String type + */ + public void setTopicDescription(String topicDescription) { + this.topicDescription = topicDescription; + } + +} |