diff options
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/beans')
11 files changed, 2037 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java new file mode 100644 index 0000000..df4a2ed --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java @@ -0,0 +1,88 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.io.Serializable; + +import javax.xml.bind.annotation.XmlRootElement; + +import com.att.nsa.drumlin.till.data.uniqueStringGenerator; +/** + * + * @author author + * + */ +@XmlRootElement +public class ApiKeyBean implements Serializable { + + private static final long serialVersionUID = -8219849086890567740L; + + private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + private String email; + private String description; + /** + * constructor + */ + public ApiKeyBean() { + super(); + } +/** + * + * @param email + * @param description + */ + public ApiKeyBean(String email, String description) { + super(); + this.email = email; + this.description = description; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getKey() { + return generateKey(16); + } + + public String getSharedSecret() { + return generateKey(24); + } + + private static String generateKey ( int length ) { + return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length ); + } + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java new file mode 100644 index 0000000..1b609b0 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java @@ -0,0 +1,227 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.metrics.impl.CdmRateTicker; + +/** + * class provide rate information + * + * @author author + * + */ +@Component +public class DMaaPCambriaLimiter { + /** + * constructor initializes + * + * @param settings + * @throws missingReqdSetting + * @throws invalidSettingValue + */ + @Autowired + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) + throws missingReqdSetting, invalidSettingValue { + fRateInfo = new HashMap<String, RateInfo>(); + fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, + CambriaConstants.kDefault_MaxEmptyPollsPerMinute); + fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength, + CambriaConstants.kDefault_RateLimitWindowLength); + fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, + CambriaConstants.kDefault_SleepMsOnRateLimit); + } + + /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** + * Construct a rate limiter. + * + * @param maxEmptyPollsPerMinute + * Pass <= 0 to deactivate rate limiting. + * @param windowLengthMins + */ + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) { + this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute)); + } + + /** + * Construct a rate limiter + * + * @param maxEmptyPollsPerMinute + * Pass <= 0 to deactivate rate limiting. + * @param sleepMs + * @param windowLengthMins + */ + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) { + fRateInfo = new HashMap<String, RateInfo>(); + fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); + fWindowLengthMins = windowLengthMins; + fSleepMs = Math.max(0, sleepMs); + } + + /** + * Tell the rate limiter about a call to a topic/group/id. If the rate is + * too high, this call delays its return and throws an exception. + * + * @param topic + * @param consumerGroup + * @param clientId + * @throws CambriaApiException + */ + public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException { + // do nothing if rate is configured 0 or less + if (fMaxEmptyPollsPerMinute <= 0) { + return; + } + + // setup rate info for this tuple + final RateInfo ri = getRateInfo(topic, consumerGroup, clientId); + + final double rate = ri.onCall(); + log.info(ri.getLabel() + ": " + rate + " empty replies/minute."); + + if (rate > fMaxEmptyPollsPerMinute) { + try { + log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute + + "."); + if (fSleepMs > 0) { + log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs + + " ms sleep, then responding in error."); + Thread.sleep(fSleepMs); + } else { + log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); + } + } catch (InterruptedException e) { + // ignore + } + ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "This client is making too many requests. Please use a long poll " + + "setting to decrease the number of requests that result in empty responses. "); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * + * @param topic + * @param consumerGroup + * @param clientId + * @param sentCount + */ + public void onSend(String topic, String consumerGroup, String clientId, long sentCount) { + // check for good replies + if (sentCount > 0) { + // that was a good send, reset the metric + getRateInfo(topic, consumerGroup, clientId).reset(); + } + } + + private static class RateInfo { + /** + * constructor initialzes + * + * @param label + * @param windowLengthMinutes + */ + public RateInfo(String label, int windowLengthMinutes) { + fLabel = label; + fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, + windowLengthMinutes, TimeUnit.MINUTES); + } + + public String getLabel() { + return fLabel; + } + + /** + * CdmRateTicker is reset + */ + public void reset() { + fCallRateSinceLastMsgSend.reset(); + } + + /** + * + * @return + */ + public double onCall() { + fCallRateSinceLastMsgSend.tick(); + return fCallRateSinceLastMsgSend.getRate(); + } + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; + } + + private final HashMap<String, RateInfo> fRateInfo; + private final double fMaxEmptyPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { + final String key = makeKey(topic, consumerGroup, clientId); + RateInfo ri = fRateInfo.get(key); + if (ri == null) { + ri = new RateInfo(key, fWindowLengthMins); + fRateInfo.put(key, ri); + } + return ri; + } + + private String makeKey(String topic, String group, String id) { + return topic + "::" + group + "::" + id; + } +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java new file mode 100644 index 0000000..79a8e1f --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpSession; + +import com.att.nsa.cambria.utils.ConfigurationReader; + +/** + * DMaaPContext provide and maintain all the configurations , Http request/response + * Session and consumer Request Time + * @author author + * + */ +public class DMaaPContext { + + private ConfigurationReader configReader; + private HttpServletRequest request; + private HttpServletResponse response; + private HttpSession session; + private String consumerRequestTime; + static int i=0; + + public synchronized static long getBatchID() { + try{ + final long metricsSendTime = System.currentTimeMillis(); + final Date d = new Date(metricsSendTime); + final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d); + long dt= Long.valueOf(text)+i; + i++; + return dt; + } + catch(NumberFormatException ex){ + return 0; + } + } + + public HttpServletRequest getRequest() { + return request; + } + + public void setRequest(HttpServletRequest request) { + this.request = request; + } + + public HttpServletResponse getResponse() { + return response; + } + + public void setResponse(HttpServletResponse response) { + this.response = response; + } + + public HttpSession getSession() { + this.session = request.getSession(); + return session; + } + + public void setSession(HttpSession session) { + this.session = session; + } + + public ConfigurationReader getConfigReader() { + return configReader; + } + + public void setConfigReader(ConfigurationReader configReader) { + this.configReader = configReader; + } + + public String getConsumerRequestTime() { + return consumerRequestTime; + } + + public void setConsumerRequestTime(String consumerRequestTime) { + this.consumerRequestTime = consumerRequestTime; + } + + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java new file mode 100644 index 0000000..28d48fa --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java @@ -0,0 +1,319 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.cambria.backends.kafka.KafkaConsumer; +import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache; +import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import kafka.consumer.ConsumerConfig; +import kafka.javaapi.consumer.ConsumerConnector; + +/** + * @author author + * + */ +public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + + //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); + /** + * constructor initialization + * + * @param settings + * @param metrics + * @param curator + * @throws missingReqdSetting + * @throws KafkaConsumerCacheException + * @throws UnknownHostException + */ + public DMaaPKafkaConsumerFactory( + @Qualifier("propertyReader") rrNvReadable settings, + @Qualifier("dMaaPMetricsSet") MetricsSet metrics, + @Qualifier("curator") CuratorFramework curator) + throws missingReqdSetting, KafkaConsumerCacheException, + UnknownHostException { + /*final String apiNodeId = settings.getString( + CambriaConstants.kSetting_ApiNodeIdentifier, + InetAddress.getLocalHost().getCanonicalHostName() + + ":" + + settings.getInt(CambriaConstants.kSetting_Port, + CambriaConstants.kDefault_Port));*/ + String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + CambriaConstants.kSetting_ApiNodeIdentifier); + if (apiNodeId == null){ + + apiNodeId=InetAddress.getLocalHost().getCanonicalHostName() + + ":" + + settings.getInt(CambriaConstants.kSetting_Port, + CambriaConstants.kDefault_Port); + } + + log.info("This Cambria API Node identifies itself as [" + apiNodeId + + "]."); + final String mode = CambriaConstants.DMAAP; + /*fSettings = settings; + fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings + .getString(CambriaConstants.kSetting_ZkConfigDbServers, + CambriaConstants.kDefault_ZkConfigDbServers));*/ + + String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + if(null==strkSettings_KafkaZookeeper){ + strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); + if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; + + } + fZooKeeper= strkSettings_KafkaZookeeper; + + //final boolean isCacheEnabled = fSettings.getBoolean( + // kSetting_EnableCache, kDefault_IsCacheEnabled); + boolean kSetting_EnableCache= kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+""); + if(null!=strkSetting_EnableCache)kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache); + + final boolean isCacheEnabled = kSetting_EnableCache; + + + fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, + metrics) : null; + if (fCache != null) { + fCache.startCache(mode, curator); + } + } + + @Override + public Consumer getConsumerFor(String topic, String consumerGroupName, + String consumerId, int timeoutMs) throws UnavailableException { + KafkaConsumer kc; + + try { + kc = (fCache != null) ? fCache.getConsumerFor(topic, + consumerGroupName, consumerId) : null; + } catch (KafkaConsumerCacheException e) { + throw new UnavailableException(e); + } + + if (kc == null) { + + final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); +// final InterProcessMutex fLock = new InterProcessMutex( +// ConfigurationReader.getCurator(), "/consumerFactory/" +// + topic + "/" + consumerGroupName + "/" +// + consumerId); + boolean locked = false; + try { + + locked = ipLock.acquire(30, TimeUnit.SECONDS); + if (!locked) { + // FIXME: this seems to cause trouble in some cases. This exception + // gets thrown routinely. Possibly a consumer trying multiple servers + // at once, producing a never-ending cycle of overlapping locks? + // The problem is that it throws and winds up sending a 503 to the + // client, which would be incorrect if the client is causing trouble + // by switching back and forth. + + throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")"); + } + +// if (!fLock.acquire(30, TimeUnit.SECONDS)) { +// throw new UnavailableException( +// "Could not acquire lock in order to create (topic, group, consumer) = " +// + "(" + topic + ", " + consumerGroupName +// + ", " + consumerId + ")"); +// } + + fCache.signalOwnership(topic, consumerGroupName, consumerId); + + log.info("Creating Kafka consumer for group [" + + consumerGroupName + "], consumer [" + consumerId + + "], on topic [" + topic + "]."); + + final String fakeGroupName = consumerGroupName + "--" + topic; + + final ConsumerConfig ccc = createConsumerConfig(fakeGroupName, + consumerId); + final ConsumerConnector cc = kafka.consumer.Consumer + .createJavaConsumerConnector(ccc); + kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc); + + if (fCache != null) { + fCache.putConsumerFor(topic, consumerGroupName, consumerId, + kc); + } + } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { + log.warn("Kafka consumer couldn't connect to ZK."); + throw new UnavailableException("Couldn't connect to ZK."); + } catch (KafkaConsumerCacheException e) { + log.warn("Failed to cache consumer (this may have performance implications): " + + e.getMessage()); + } catch (Exception e) { + throw new UnavailableException( + "Error while acquiring consumer factory lock", e); + } finally { + if ( locked ) + { + try { + ipLock.release(); + } catch (Exception e) { + throw new UnavailableException("Error while releasing consumer factory lock", e); + } + } + } + } + + return kc; + } + + @Override + public synchronized void destroyConsumer(String topic, + String consumerGroup, String clientId) { + if (fCache != null) { + fCache.dropConsumer(topic, consumerGroup, clientId); + } + } + + @Override + public synchronized Collection<? extends Consumer> getConsumers() { + return fCache.getConsumers(); + } + + @Override + public synchronized void dropCache() { + fCache.dropAllConsumers(); + } + + private ConsumerConfig createConsumerConfig(String groupId, + String consumerId) { + final Properties props = new Properties(); + props.put("zookeeper.connect", fZooKeeper); + props.put("group.id", groupId); + props.put("consumer.id", consumerId); + //props.put("auto.commit.enable", "false"); + // additional settings: start with our defaults, then pull in configured + // overrides + props.putAll(KafkaInternalDefaults); + for (String key : KafkaConsumerKeys) { + transferSettingIfProvided(props, key, "kafka"); + } + + return new ConsumerConfig(props); + } + + //private final rrNvReadable fSettings; + private final KafkaConsumerCache fCache; + + private String fZooKeeper; + + private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper"; + + private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>(); + + /** + * putting values in hashmap like consumer timeout, zookeeper time out, etc + * + * @param setting + */ + public static void populateKafkaInternalDefaultsMap() { + //@Qualifier("propertyReader") rrNvReadable setting) { + try { + + HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop); + + KafkaInternalDefaults.put("consumer.timeout.ms", + // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms")); + map1.get( "consumer.timeout.ms")); + + KafkaInternalDefaults.put("zookeeper.connection.timeout.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms")); + map1.get("zookeeper.connection.timeout.ms")); + KafkaInternalDefaults.put("zookeeper.session.timeout.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms")); + map1.get("zookeeper.session.timeout.ms")); + KafkaInternalDefaults.put("zookeeper.sync.time.ms", + // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms")); + map1.get( "zookeeper.sync.time.ms")); + KafkaInternalDefaults.put("auto.commit.interval.ms", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms")); + map1.get( "auto.commit.interval.ms")); + KafkaInternalDefaults.put("fetch.message.max.bytes", + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes")); + map1.get("fetch.message.max.bytes")); + KafkaInternalDefaults.put("auto.commit.enable", + // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable")); + map1.get("auto.commit.enable")); + } catch (Exception e) { + log.error("Failed to load Kafka Internal Properties.", e); + } + } + + private static final String KafkaConsumerKeys[] = { "socket.timeout.ms", + "socket.receive.buffer.bytes", "fetch.message.max.bytes", + "auto.commit.interval.ms", "queued.max.message.chunks", + "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes", + "rebalance.backoff.ms", "refresh.leader.backoff.ms", + "auto.offset.reset", "consumer.timeout.ms", + "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms", + "zookeeper.sync.time.ms" }; + + private static String makeLongKey(String key, String prefix) { + return prefix + "." + key; + } + + private void transferSettingIfProvided(Properties target, String key, + String prefix) { + String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix)); + + // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null!=keyVal) { + // final String val = fSettings + // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); + target.put(key, keyVal); + } + } + + } + + diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java new file mode 100644 index 0000000..9d53ef2 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java @@ -0,0 +1,462 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +//import org.apache.log4-j.Logger; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.metabroker.Broker; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.ConfigPath; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaAclUtils; +import com.att.nsa.security.NsaApiKey; + +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer$; + +/** + * class performing all topic operations + * + * @author author + * + */ + +public class DMaaPKafkaMetaBroker implements Broker { + + //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); + + + /** + * DMaaPKafkaMetaBroker constructor initializing + * + * @param settings + * @param zk + * @param configDb + */ + public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + //fSettings = settings; + fZk = zk; + fCambriaConfig = configDb; + fBaseTopicData = configDb.parse("/topics"); + } + + @Override + public List<Topic> getAllTopics() throws ConfigDbException { + log.info("Retrieving list of all the topics."); + final LinkedList<Topic> result = new LinkedList<Topic>(); + try { + log.info("Retrieving all topics from root: " + zkTopicsRoot); + final List<String> topics = fZk.getChildren(zkTopicsRoot); + for (String topic : topics) { + result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); + } + + JSONObject dataObj = new JSONObject(); + dataObj.put("topics", new JSONObject()); + + for (String topic : topics) { + dataObj.getJSONObject("topics").put(topic, new JSONObject()); + } + } catch (ZkNoNodeException excp) { + // very fresh kafka doesn't have any topics or a topics node + log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); + } + return result; + } + + @Override + public Topic getTopic(String topic) throws ConfigDbException { + if (fZk.exists(zkTopicsRoot + "/" + topic)) { + return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); + } + // else: no such topic in kafka + return null; + } + + /** + * static method get KafkaTopic object + * + * @param db + * @param base + * @param topic + * @return + * @throws ConfigDbException + */ + public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException { + return new KafkaTopic(topic, db, base); + } + + /** + * creating topic + */ + @Override + public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, + boolean transactionEnabled) throws TopicExistsException, CambriaApiException { + log.info("Creating topic: " + topic); + try { + log.info("Check if topic [" + topic + "] exist."); + // first check for existence "our way" + final Topic t = getTopic(topic); + if (t != null) { + log.info("Could not create topic [" + topic + "]. Topic Already exists."); + throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); + } + } catch (ConfigDbException e1) { + log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "Couldn't check topic data in config db."); + } + + // we only allow 3 replicas. (If we don't test this, we get weird + // results from the cluster, + // so explicit test and fail.) + if (replicas < 1 || replicas > 3) { + log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); + throw new CambriaApiException(HttpStatusCodes.k400_badRequest, + "The replica count must be between 1 and 3."); + } + if (partitions < 1) { + log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); + throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); + } + + // create via kafka + try { + ZkClient zkClient = null; + try { + log.info("Loading zookeeper client for creating topic."); + // FIXME: use of this scala module$ thing is a goofy hack to + // make Kafka aware of the + // topic creation. (Otherwise, the topic is only partially + // created in ZK.) + zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, + ZKStringSerializer$.MODULE$); + + log.info("Zookeeper client loaded successfully. Creating topic."); + AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties()); + } catch (kafka.common.TopicExistsException e) { + log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e); + throw new TopicExistsException(topic); + } catch (ZkNoNodeException e) { + log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e); + // Kafka throws this when the server isn't running (and perhaps + // hasn't ever run) + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "The Kafka cluster is not setup."); + } catch (kafka.admin.AdminOperationException e) { + // Kafka throws this when the server isn't running (and perhaps + // hasn't ever run) + log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(), + e); + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "The Kafka cluster can't handle your request. Talk to the administrators."); + } finally { + log.info("Closing zookeeper connection."); + if (zkClient != null) + zkClient.close(); + } + + log.info("Creating topic entry for topic: " + topic); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (ConfigDbException excp) { + log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp); + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "Failed to create topic data. Talk to the administrators."); + } + } + + @Override + public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException { + log.info("Deleting topic: " + topic); + ZkClient zkClient = null; + try { + log.info("Loading zookeeper client for topic deletion."); + // FIXME: use of this scala module$ thing is a goofy hack to make + // Kafka aware of the + // topic creation. (Otherwise, the topic is only partially created + // in ZK.) + zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, + ZKStringSerializer$.MODULE$); + + log.info("Zookeeper client loaded successfully. Deleting topic."); + AdminUtils.deleteTopic(zkClient, topic); + } catch (kafka.common.TopicExistsException e) { + log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); + throw new TopicExistsException(topic); + } catch (ZkNoNodeException e) { + log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e); + // Kafka throws this when the server isn't running (and perhaps + // hasn't ever run) + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup."); + } catch (kafka.admin.AdminOperationException e) { + // Kafka throws this when the server isn't running (and perhaps + // hasn't ever run) + log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e); + throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, + "The Kafka cluster can't handle your request. Talk to the administrators."); + } finally { + log.info("Closing zookeeper connection."); + if (zkClient != null) + zkClient.close(); + } + + // throw new UnsupportedOperationException ( "We can't programmatically + // delete Kafka topics yet." ); + } + + //private final rrNvReadable fSettings; + private final ZkClient fZk; + private final ConfigDb fCambriaConfig; + private final ConfigPath fBaseTopicData; + + private static final String zkTopicsRoot = "/brokers/topics"; + private static final JSONObject kEmptyAcl = new JSONObject(); + + /** + * method Providing KafkaTopic Object associated with owner and + * transactionenabled or not + * + * @param name + * @param desc + * @param owner + * @param transactionEnabled + * @return + * @throws ConfigDbException + */ + public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) + throws ConfigDbException { + return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); + } + + /** + * static method giving kafka topic object + * + * @param db + * @param basePath + * @param name + * @param desc + * @param owner + * @param transactionEnabled + * @return + * @throws ConfigDbException + */ + public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, + boolean transactionEnabled) throws ConfigDbException { + final JSONObject o = new JSONObject(); + o.put("owner", owner); + o.put("description", desc); + o.put("txenabled", transactionEnabled); + db.store(basePath.getChild(name), o.toString()); + return new KafkaTopic(name, db, basePath); + } + + /** + * class performing all user opearation like user is eligible to read, + * write. permitting a user to write and read, + * + * @author author + * + */ + public static class KafkaTopic implements Topic { + /** + * constructor initializes + * + * @param name + * @param configdb + * @param baseTopic + * @throws ConfigDbException + */ + public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { + fName = name; + fConfigDb = configdb; + fBaseTopicData = baseTopic; + + String data = fConfigDb.load(fBaseTopicData.getChild(fName)); + if (data == null) { + data = "{}"; + } + + final JSONObject o = new JSONObject(data); + fOwner = o.optString("owner", ""); + fDesc = o.optString("description", ""); + fTransactionEnabled = o.optBoolean("txenabled", false);// default + // value is + // false + // if this topic has an owner, it needs both read/write ACLs. If there's no + // owner (or it's empty), null is okay -- this is for existing or implicitly + // created topics. + JSONObject readers = o.optJSONObject ( "readers" ); + if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + fReaders = fromJson ( readers ); + + JSONObject writers = o.optJSONObject ( "writers" ); + if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + fWriters = fromJson ( writers ); + } + private NsaAcl fromJson(JSONObject o) { + NsaAcl acl = new NsaAcl(); + if (o != null) { + JSONArray a = o.optJSONArray("allowed"); + if (a != null) { + for (int i = 0; i < a.length(); ++i) { + String user = a.getString(i); + acl.add(user); + } + } + } + return acl; + } + @Override + public String getName() { + return fName; + } + + @Override + public String getOwner() { + return fOwner; + } + + @Override + public String getDescription() { + return fDesc; + } + + @Override + public NsaAcl getReaderAcl() { + return fReaders; + } + + @Override + public NsaAcl getWriterAcl() { + return fWriters; + } + + @Override + public void checkUserRead(NsaApiKey user) throws AccessDeniedException { + NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user ); + } + + @Override + public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { + NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user ); + } + + @Override + public void permitWritesFromUser(String pubId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, false, true, pubId); + } + + @Override + public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, false, false, pubId); + } + + @Override + public void permitReadsByUser(String consumerId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, true, true, consumerId); + } + + @Override + public void denyReadsByUser(String consumerId, NsaApiKey asUser) + throws ConfigDbException, AccessDeniedException { + updateAcl(asUser, true, false, consumerId); + } + + private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) + throws ConfigDbException, AccessDeniedException{ + try + { + final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); + + // we have to assume we have current data, or load it again. for the expected use + // case, assuming we can overwrite the data is fine. + final JSONObject o = new JSONObject (); + o.put ( "owner", fOwner ); + o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) ); + o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) ); + fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); + + log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); + + } + catch ( ConfigDbException x ) + { + throw x; + } + catch ( AccessDeniedException x ) + { + throw x; + } + + } + + private JSONObject safeSerialize(NsaAcl acl) { + return acl == null ? null : acl.serialize(); + } + + private final String fName; + private final ConfigDb fConfigDb; + private final ConfigPath fBaseTopicData; + private final String fOwner; + private final String fDesc; + private final NsaAcl fReaders; + private final NsaAcl fWriters; + private boolean fTransactionEnabled; + + public boolean isTransactionEnabled() { + return fTransactionEnabled; + } + + @Override + public Set<String> getOwners() { + final TreeSet<String> owners = new TreeSet<String> (); + owners.add ( fOwner ); + return owners; + } + } + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java new file mode 100644 index 0000000..3c3aa6d --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java @@ -0,0 +1,232 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.apiServer.metrics.cambria.DMaaPMetricsSender; +import com.att.nsa.cambria.CambriaApiVersionInfo; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.metrics.impl.CdmConstant; +import com.att.nsa.metrics.impl.CdmCounter; +import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl; +import com.att.nsa.metrics.impl.CdmMovingAverage; +import com.att.nsa.metrics.impl.CdmRateTicker; +import com.att.nsa.metrics.impl.CdmSimpleMetric; +import com.att.nsa.metrics.impl.CdmStringConstant; +import com.att.nsa.metrics.impl.CdmTimeSince; + +/*@Component("dMaaPMetricsSet")*/ +/** + * Metrics related information + * + * @author author + * + */ +public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet { + + private final CdmStringConstant fVersion; + private final CdmConstant fStartTime; + private final CdmTimeSince fUpTime; + + private final CdmCounter fRecvTotal; + private final CdmRateTicker fRecvEpsInstant; + private final CdmRateTicker fRecvEpsShort; + private final CdmRateTicker fRecvEpsLong; + + private final CdmCounter fSendTotal; + private final CdmRateTicker fSendEpsInstant; + private final CdmRateTicker fSendEpsShort; + private final CdmRateTicker fSendEpsLong; + + private final CdmCounter fKafkaConsumerCacheMiss; + private final CdmCounter fKafkaConsumerCacheHit; + + private final CdmCounter fKafkaConsumerClaimed; + private final CdmCounter fKafkaConsumerTimeout; + + private final CdmSimpleMetric fFanOutRatio; + + private final HashMap<String, CdmRateTicker> fPathUseRates; + private final HashMap<String, CdmMovingAverage> fPathAvgs; + + private rrNvReadable fSettings; + + private final ScheduledExecutorService fScheduler; + + /** + * Constructor initialization + * + * @param cs + */ + //public DMaaPMetricsSet() { + public DMaaPMetricsSet(rrNvReadable cs) { + //fSettings = cs; + + fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); + super.putItem("version", fVersion); + + final long startTime = System.currentTimeMillis(); + final Date d = new Date(startTime); + final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d); + fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text); + super.putItem("startTime", fStartTime); + + fUpTime = new CdmTimeSince("seconds since start"); + super.putItem("upTime", fUpTime); + + fRecvTotal = new CdmCounter("Total events received since start"); + super.putItem("recvTotalEvents", fRecvTotal); + + fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + super.putItem("recvEpsInstant", fRecvEpsInstant); + + fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); + super.putItem("recvEpsShort", fRecvEpsShort); + + fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); + super.putItem("recvEpsLong", fRecvEpsLong); + + fSendTotal = new CdmCounter("Total events sent since start"); + super.putItem("sendTotalEvents", fSendTotal); + + fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + super.putItem("sendEpsInstant", fSendEpsInstant); + + fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); + super.putItem("sendEpsShort", fSendEpsShort); + + fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); + super.putItem("sendEpsLong", fSendEpsLong); + + fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses"); + super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss); + + fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits"); + super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit); + + fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed"); + super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed); + + fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout"); + super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout); + + // FIXME: CdmLevel is not exactly a great choice + fFanOutRatio = new CdmSimpleMetric() { + @Override + public String getRawValueString() { + return getRawValue().toString(); + } + + @Override + public Number getRawValue() { + final double s = fSendTotal.getValue(); + final double r = fRecvTotal.getValue(); + return r == 0.0 ? 0.0 : s / r; + } + + @Override + public String summarize() { + return getRawValueString() + " sends per recv"; + } + + }; + super.putItem("fanOut", fFanOutRatio); + + // these are added to the metrics catalog as they're discovered + fPathUseRates = new HashMap<String, CdmRateTicker>(); + fPathAvgs = new HashMap<String, CdmMovingAverage>(); + + fScheduler = Executors.newScheduledThreadPool(1); + } + + @Override + public void setupCambriaSender() { + DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap"); + } + + @Override + public void onRouteComplete(String name, long durationMs) { + CdmRateTicker ticker = fPathUseRates.get(name); + if (ticker == null) { + ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS); + fPathUseRates.put(name, ticker); + super.putItem("pathUse_" + name, ticker); + } + ticker.tick(); + + CdmMovingAverage durs = fPathAvgs.get(name); + if (durs == null) { + durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES); + fPathAvgs.put(name, durs); + super.putItem("pathDurationMs_" + name, durs); + } + durs.tick(durationMs); + } + + @Override + public void publishTick(int amount) { + if (amount > 0) { + fRecvTotal.bumpBy(amount); + fRecvEpsInstant.tick(amount); + fRecvEpsShort.tick(amount); + fRecvEpsLong.tick(amount); + } + } + + @Override + public void consumeTick(int amount) { + if (amount > 0) { + fSendTotal.bumpBy(amount); + fSendEpsInstant.tick(amount); + fSendEpsShort.tick(amount); + fSendEpsLong.tick(amount); + } + } + + @Override + public void onKafkaConsumerCacheMiss() { + fKafkaConsumerCacheMiss.bump(); + } + + @Override + public void onKafkaConsumerCacheHit() { + fKafkaConsumerCacheHit.bump(); + } + + @Override + public void onKafkaConsumerClaimed() { + fKafkaConsumerClaimed.bump(); + } + + @Override + public void onKafkaConsumerTimeout() { + fKafkaConsumerTimeout.bump(); + } + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java new file mode 100644 index 0000000..ce257d4 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java @@ -0,0 +1,139 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import java.security.Key; + +//import org.apache.log4-j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.confimpl.EncryptingLayer; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.db.BaseNsaApiDbImpl; +import com.att.nsa.security.db.EncryptingApiDbImpl; +import com.att.nsa.security.db.NsaApiDb; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; +import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; +import com.att.nsa.util.rrConvertor; + +/** + * + * @author author + * + */ +public class DMaaPNsaApiDb { + + //private rrNvReadable settings; + private DMaaPZkConfigDb cdb; + + //private static final Logger log = Logger + // .getLogger(DMaaPNsaApiDb.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); + +/** + * + * Constructor initialized + * @param settings + * @param cdb + */ + @Autowired + public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { + //this.setSettings(settings); + this.setCdb(cdb); + } + /** + * + * @param settings + * @param cdb + * @return + * @throws ConfigDbException + * @throws missingReqdSetting + */ + public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb( + rrNvReadable settings, ConfigDb cdb) throws ConfigDbException, + missingReqdSetting { + // Cambria uses an encrypted api key db + + //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); + final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); + + + // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); + final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); + // if neither value was provided, don't encrypt api key db + if (keyBase64 == null && initVectorBase64 == null) { + log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); + return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb, + new NsaSimpleApiKeyFactory()); + } else if (keyBase64 == null) { + // neither or both, otherwise something's goofed + throw new missingReqdSetting("cambria.secureConfig.key"); + } else if (initVectorBase64 == null) { + // neither or both, otherwise something's goofed + throw new missingReqdSetting("cambria.secureConfig.iv"); + } else { + log.info("This server is configured to use an encrypted API key database."); + final Key key = EncryptingLayer.readSecretKey(keyBase64); + final byte[] iv = rrConvertor.base64Decode(initVectorBase64); + return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb, + new NsaSimpleApiKeyFactory(), key, iv); + } + } + + /** + * @return + * returns settings + */ +/* public rrNvReadable getSettings() { + return settings; + }*/ + + /** + * @param settings + * set settings + */ + /*public void setSettings(rrNvReadable settings) { + this.settings = settings; + }*/ + + /** + * @return + * returns cbd + */ + public DMaaPZkConfigDb getCdb() { + return cdb; + } + /** + * @param cdb + * set cdb + */ + public void setCdb(DMaaPZkConfigDb cdb) { + this.cdb = cdb; + } + + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java new file mode 100644 index 0000000..590ecd6 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import org.I0Itec.zkclient.ZkClient; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.drumlin.till.nv.rrNvReadable; + +/** + * Created for Zookeeper client which will read configuration and settings parameter + * @author author + * + */ +public class DMaaPZkClient extends ZkClient { + + /** + * This constructor will get the settings value from rrNvReadable + * and ConfigurationReader's zookeeper connection + * @param settings + */ + public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) { + super(ConfigurationReader.getMainZookeeperConnectionString()); + } +} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java new file mode 100644 index 0000000..8fe96e9 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.beans; + +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.confimpl.ZkConfigDb; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +//import com.att.nsa.configs.confimpl.ZkConfigDb; +/** + * Provide the zookeeper config db connection + * @author author + * + */ +public class DMaaPZkConfigDb extends ZkConfigDb { + /** + * This Constructor will provide the configuration details from the property reader + * and DMaaPZkClient + * @param zk + * @param settings + */ + public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, + @Qualifier("propertyReader") rrNvReadable settings) { + + //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); + super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); + + } + + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java new file mode 100644 index 0000000..5a195e9 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java @@ -0,0 +1,214 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.cambria.beans; + +import java.util.Date; + +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.utils.Utils; + +/** + * @author author + * + */ + +public class LogDetails { + + private String publisherId; + private String topicId; + private String subscriberGroupId; + private String subscriberId; + private String publisherIp; + private String messageBatchId; + private String messageSequence; + private String messageTimestamp; + private String consumeTimestamp; + private String transactionIdTs; + private String serverIp; + + private long messageLengthInBytes; + private long totalMessageCount; + + private boolean transactionEnabled; + /** + * This is for transaction enabled logging details + * + */ + public LogDetails() { + super(); + } + + public String getTransactionId() { + StringBuilder transactionId = new StringBuilder(); + transactionId.append(transactionIdTs); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(publisherIp); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(messageBatchId); + transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); + transactionId.append(messageSequence); + + return transactionId.toString(); + } + + public String getPublisherId() { + return publisherId; + } + + public void setPublisherId(String publisherId) { + this.publisherId = publisherId; + } + + public String getTopicId() { + return topicId; + } + + public void setTopicId(String topicId) { + this.topicId = topicId; + } + + public String getSubscriberGroupId() { + return subscriberGroupId; + } + + public void setSubscriberGroupId(String subscriberGroupId) { + this.subscriberGroupId = subscriberGroupId; + } + + public String getSubscriberId() { + return subscriberId; + } + + public void setSubscriberId(String subscriberId) { + this.subscriberId = subscriberId; + } + + public String getPublisherIp() { + return publisherIp; + } + + public void setPublisherIp(String publisherIp) { + this.publisherIp = publisherIp; + } + + public String getMessageBatchId() { + return messageBatchId; + } + + public void setMessageBatchId(Long messageBatchId) { + this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId); + } + + public String getMessageSequence() { + return messageSequence; + } + + public void setMessageSequence(String messageSequence) { + this.messageSequence = messageSequence; + } + + public String getMessageTimestamp() { + return messageTimestamp; + } + + public void setMessageTimestamp(String messageTimestamp) { + this.messageTimestamp = messageTimestamp; + } + + public String getPublishTimestamp() { + return Utils.getFormattedDate(new Date()); + } + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + public long getMessageLengthInBytes() { + return messageLengthInBytes; + } + + public void setMessageLengthInBytes(long messageLengthInBytes) { + this.messageLengthInBytes = messageLengthInBytes; + } + + public long getTotalMessageCount() { + return totalMessageCount; + } + + public void setTotalMessageCount(long totalMessageCount) { + this.totalMessageCount = totalMessageCount; + } + + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + public String getTransactionIdTs() { + return transactionIdTs; + } + + public void setTransactionIdTs(String transactionIdTs) { + this.transactionIdTs = transactionIdTs; + } + + public String getPublisherLogDetails() { + + StringBuilder buffer = new StringBuilder(); + buffer.append("[publisherId=" + publisherId); + buffer.append(", topicId=" + topicId); + buffer.append(", messageTimestamp=" + messageTimestamp); + buffer.append(", publisherIp=" + publisherIp); + buffer.append(", messageBatchId=" + messageBatchId); + buffer.append(", messageSequence=" + messageSequence ); + buffer.append(", messageLengthInBytes=" + messageLengthInBytes); + buffer.append(", transactionEnabled=" + transactionEnabled); + buffer.append(", transactionId=" + getTransactionId()); + buffer.append(", publishTimestamp=" + getPublishTimestamp()); + buffer.append(", serverIp=" + getServerIp()+"]"); + return buffer.toString(); + + } + + public String getServerIp() { + return serverIp; + } + + public void setServerIp(String serverIp) { + this.serverIp = serverIp; + } + + public void setMessageBatchId(String messageBatchId) { + this.messageBatchId = messageBatchId; + } + +} diff --git a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java new file mode 100644 index 0000000..3303c07 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java @@ -0,0 +1,155 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.cambria.beans; + +import java.io.Serializable; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author author + * + */ +@XmlRootElement +public class TopicBean implements Serializable { + + private static final long serialVersionUID = -8620390377775457949L; + private String topicName; + private String topicDescription; + + private int partitionCount = 1; //default values + private int replicationCount = 1; //default value + + private boolean transactionEnabled; + + /** + * constructor + */ + public TopicBean() { + super(); + } + + /** + * constructor initialization with topic details name, description, + * partition, replication, transaction + * + * @param topicName + * @param description + * @param partitionCount + * @param replicationCount + * @param transactionEnabled + */ + public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount, + boolean transactionEnabled) { + super(); + this.topicName = topicName; + this.topicDescription = topicDescription; + this.partitionCount = partitionCount; + this.replicationCount = replicationCount; + this.transactionEnabled = transactionEnabled; + } + + /** + * @return + * returns topic name which is of String type + */ + public String getTopicName() { + return topicName; + } + + /** + * @param topicName + * set topic name + */ + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + + /** + * @return + * returns partition count which is of int type + */ + public int getPartitionCount() { + return partitionCount; + } + + /** + * @param partitionCount + * set partition Count + */ + public void setPartitionCount(int partitionCount) { + this.partitionCount = partitionCount; + } + + /** + * @return + * returns replication count which is of int type + */ + public int getReplicationCount() { + return replicationCount; + } + + /** + * @param + * set replication count which is of int type + */ + public void setReplicationCount(int replicationCount) { + this.replicationCount = replicationCount; + } + + /** + * @return + * returns boolean value which indicates whether transaction is Enabled + */ + public boolean isTransactionEnabled() { + return transactionEnabled; + } + + /** + * @param + * sets boolean value which indicates whether transaction is Enabled + */ + public void setTransactionEnabled(boolean transactionEnabled) { + this.transactionEnabled = transactionEnabled; + } + + /** + * + * @return returns description which is of String type + */ + public String getTopicDescription() { + return topicDescription; + } + /** + * + * @param topicDescription + * set description which is of String type + */ + public void setTopicDescription(String topicDescription) { + this.topicDescription = topicDescription; + } + +} |