From b32effcaf5684d5e2f338a4537b71a2375c534e5 Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 14 Aug 2018 09:34:46 -0400 Subject: update the testcases after the kafka 11 changes Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava --- .../com/att/nsa/cambria/backends/Consumer.java | 96 ---- .../att/nsa/cambria/backends/ConsumerFactory.java | 110 ---- .../com/att/nsa/cambria/backends/MetricsSet.java | 71 --- .../com/att/nsa/cambria/backends/Publisher.java | 98 ---- .../nsa/cambria/backends/kafka/KafkaConsumer.java | 245 -------- .../cambria/backends/kafka/KafkaConsumerCache.java | 618 --------------------- .../nsa/cambria/backends/kafka/KafkaPublisher.java | 168 ------ .../backends/memory/MemoryConsumerFactory.java | 160 ------ .../cambria/backends/memory/MemoryMetaBroker.java | 199 ------- .../nsa/cambria/backends/memory/MemoryQueue.java | 207 ------- .../backends/memory/MemoryQueuePublisher.java | 90 --- .../cambria/backends/memory/MessageDropper.java | 61 -- .../nsa/cambria/backends/memory/MessageLogger.java | 101 ---- 13 files changed, 2224 deletions(-) delete mode 100644 src/main/java/com/att/nsa/cambria/backends/Consumer.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/MetricsSet.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/Publisher.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java delete mode 100644 src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java (limited to 'src/main/java/com/att/nsa/cambria/backends') diff --git a/src/main/java/com/att/nsa/cambria/backends/Consumer.java b/src/main/java/com/att/nsa/cambria/backends/Consumer.java deleted file mode 100644 index d4946ba..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/Consumer.java +++ /dev/null @@ -1,96 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends; - -/** - * A consumer interface. Consumers pull the next message from a given topic. - * @author author - */ -public interface Consumer -{ - /** - * A message interface provide the offset and message - * @author author - * - */ - public interface Message - { - /** - * returning the offset of that particular message - * @return long - */ - long getOffset (); - /** - * returning the message - * @return message - */ - String getMessage (); - } - - /** - * Get this consumer's name - * @return name - */ - String getName (); - - /** - * Get creation time in ms - * @return - */ - long getCreateTimeMs (); - - /** - * Get last access time in ms - * @return - */ - long getLastAccessMs (); - - /** - * Get the next message from this source. This method must not block. - * @return the next message, or null if none are waiting - */ - Message nextMessage (); - - /** - * Get the next message from this source. This method must not block. - * @param atOffset start with the next message at or after atOffset. -1 means next from last request - * @return the next message, or null if none are waiting - */ - - - /** - * Close/clean up this consumer - */ - void close(); - - /** - * Commit the offset of the last consumed message - * - */ - void commitOffsets(); - - /** - * Get the offset this consumer is currently at - * @return offset - */ - long getOffset(); -} diff --git a/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java b/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java deleted file mode 100644 index 1597c07..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java +++ /dev/null @@ -1,110 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends; - -import java.util.Collection; - -/** - * This is the factory class to instantiate the consumer - * - * @author author - * - */ - -public interface ConsumerFactory { - public static final String kSetting_EnableCache = "cambria.consumer.cache.enabled"; - public static boolean kDefault_IsCacheEnabled = true; - - /** - * User defined exception for Unavailable Exception - * - * @author author - * - */ - public class UnavailableException extends Exception { - /** - * Unavailable Exception with message - * - * @param msg - */ - public UnavailableException(String msg) { - super(msg); - } - - /** - * Unavailable Exception with the throwable object - * - * @param t - */ - public UnavailableException(Throwable t) { - super(t); - } - - /** - * Unavailable Exception with the message and cause - * - * @param msg - * @param cause - */ - public UnavailableException(String msg, Throwable cause) { - super(msg, cause); - } - - private static final long serialVersionUID = 1L; - } - - /** - * For admin use, drop all cached consumers. - */ - public void dropCache(); - - /** - * Get or create a consumer for the given set of info (topic, group, id) - * - * @param topic - * @param consumerGroupId - * @param clientId - * @param timeoutMs - * @return - * @throws UnavailableException - */ - public Consumer getConsumerFor(String topic, String consumerGroupId, - String clientId, int timeoutMs) throws UnavailableException; - - /** - * For factories that employ a caching mechanism, this allows callers to - * explicitly destory a consumer that resides in the factory's cache. - * - * @param topic - * @param consumerGroupId - * @param clientId - */ - public void destroyConsumer(String topic, String consumerGroupId, - String clientId); - - /** - * For admin/debug, we provide access to the consumers - * - * @return a collection of consumers - */ - public Collection getConsumers(); -} diff --git a/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java b/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java deleted file mode 100644 index ce104ac..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java +++ /dev/null @@ -1,71 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends; - -import com.att.nsa.metrics.CdmMetricsRegistry; -/** - * This interface will help to generate metrics - * @author author - * - */ -public interface MetricsSet extends CdmMetricsRegistry{ - - /** - * This method will setup cambria sender code - */ - public void setupCambriaSender (); - /** - * This method will define on route complete - * @param name - * @param durationMs - */ - public void onRouteComplete ( String name, long durationMs ); - /** - * This method will help the kafka publisher while publishing the messages - * @param amount - */ - public void publishTick ( int amount ); - /** - * This method will help the kafka consumer while consuming the messages - * @param amount - */ - public void consumeTick ( int amount ); - /** - * This method will call if the kafka consumer cache missed - */ - public void onKafkaConsumerCacheMiss (); - /** - * This method will call if the kafka consumer cache will be hit while publishing/consuming the messages - */ - public void onKafkaConsumerCacheHit (); - /** - * This method will call if the kafka consumer cache claimed - */ - public void onKafkaConsumerClaimed (); - /** - * This method will call if Kafka consumer is timed out - */ - public void onKafkaConsumerTimeout (); - - - -} diff --git a/src/main/java/com/att/nsa/cambria/backends/Publisher.java b/src/main/java/com/att/nsa/cambria/backends/Publisher.java deleted file mode 100644 index 696e78f..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/Publisher.java +++ /dev/null @@ -1,98 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import kafka.producer.KeyedMessage; - -import com.att.nsa.cambria.beans.LogDetails; - -/** - * A publisher interface. Publishers receive messages and post them to a topic. - * @author author - */ -public interface Publisher -{ - /** - * A message interface. The message has a key and a body. - * @author author - */ - public interface message - { - /** - * Get the key for this message. The key is used to partition messages - * into "sub-streams" that have guaranteed order. The key can be null, - * which means the message can be processed without any concern for order. - * - * @return a key, possibly null - */ - String getKey(); - - /** - * Get the message body. - * @return a message body - */ - String getMessage(); - /** - * set the logging params for transaction enabled logging - * @param logDetails - */ - void setLogDetails (LogDetails logDetails); - /** - * Get the log details for transaction enabled logging - * @return LogDetails - */ - LogDetails getLogDetails (); - - /** - * boolean transactionEnabled - * @return true/false - */ - boolean isTransactionEnabled(); - /** - * Set the transaction enabled flag from prop file or topic based implementation - * @param transactionEnabled - */ - void setTransactionEnabled(boolean transactionEnabled); - } - - /** - * Send a single message to a topic. Equivalent to sendMessages with a list of size 1. - * @param topic - * @param msg - * @throws IOException - */ - public void sendMessage ( String topic, message msg ) throws IOException; - - /** - * Send messages to a topic. - * @param topic - * @param msgs - * @throws IOException - */ - public void sendMessages ( String topic, List msgs ) throws IOException; - - public void sendBatchMessage(String topic ,ArrayList> kms) throws IOException; -} diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java deleted file mode 100644 index 692f093..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java +++ /dev/null @@ -1,245 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.kafka; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import com.att.nsa.cambria.backends.Consumer; - -/** - * A consumer instance that's created per-request. These are stateless so that - * clients can connect to this service as a proxy. - * - * @author author - * - */ -public class KafkaConsumer implements Consumer { - private enum State { - OPENED, CLOSED - } - - /** - * KafkaConsumer() is constructor. It has following 4 parameters:- - * @param topic - * @param group - * @param id - * @param cc - * - */ - - public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) { - fTopic = topic; - fGroup = group; - fId = id; - fConnector = cc; - - fCreateTimeMs = System.currentTimeMillis(); - fLastTouch = fCreateTimeMs; - - fLogTag = fGroup + "(" + fId + ")/" + fTopic; - offset = 0; - - state = KafkaConsumer.State.OPENED; - - final Map topicCountMap = new HashMap(); - topicCountMap.put(fTopic, 1); - final Map>> consumerMap = fConnector - .createMessageStreams(topicCountMap); - final List> streams = consumerMap.get(fTopic); - fStream = streams.iterator().next(); - } - - - /** getName() method returns string type value. - * returns 3 parameters in string:- - * fTopic,fGroup,fId - * @Override - */ - public String getName() { - return fTopic + " : " + fGroup + " : " + fId; - } - - /** getCreateTimeMs() method returns long type value. - * returns fCreateTimeMs variable value - * @Override - * - */ - public long getCreateTimeMs() { - return fCreateTimeMs; - } - - /** getLastAccessMs() method returns long type value. - * returns fLastTouch variable value - * @Override - * - */ - public long getLastAccessMs() { - return fLastTouch; - } - - - /** - * nextMessage() is synchronized method that means at a time only one object can access it. - * getName() method returns String which is of type Consumer.Message - * @Override - * */ - public synchronized Consumer.Message nextMessage() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("nextMessage() called on closed KafkaConsumer " + getName()); - return null; - } - - try { - ConsumerIterator it = fStream.iterator(); - if (it.hasNext()) { - final MessageAndMetadata msg = it.next(); - offset = msg.offset(); - - return new Consumer.Message() { - @Override - public long getOffset() { - return msg.offset(); - } - - @Override - public String getMessage() { - return new String(msg.message()); - } - }; - } - } catch (kafka.consumer.ConsumerTimeoutException x) { - log.error(fLogTag + ": ConsumerTimeoutException in Kafka consumer; returning null & Exception at nextMessage() : " + x); - } catch (java.lang.IllegalStateException x) { - log.error(fLogTag + ": Error found next() at : " + x); - } - - return null; - } - - /** getOffset() method returns long type value. - * returns offset variable value - * @Override - * - */ - public long getOffset() { - return offset; - } - - /** commit offsets - * commitOffsets() method will be called on closed of KafkaConsumer. - * @Override - * - */ - public void commitOffsets() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); - return; - } - fConnector.commitOffsets(); - } - - /** - * updating fLastTouch with current time in ms - */ - public void touch() { - fLastTouch = System.currentTimeMillis(); - } - - /** getLastTouch() method returns long type value. - * returns fLastTouch variable value - * - */ - public long getLastTouch() { - return fLastTouch; - } - - /** - * setting the kafkaConsumer state to closed - */ - public synchronized void close() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("close() called on closed KafkaConsumer " + getName()); - return; - } - - setState(KafkaConsumer.State.CLOSED); - fConnector.shutdown(); - } - - /** - * getConsumerGroup() returns Consumer group - * @return - */ - public String getConsumerGroup() { - return fGroup; - } - - /** - * getConsumerId returns Consumer Id - * @return - */ - public String getConsumerId() { - return fId; - } - - /** - * getState returns kafkaconsumer state - * @return - */ - private KafkaConsumer.State getState() { - return this.state; - } - - /** - * setState() sets the kafkaConsumer state - * @param state - */ - private void setState(KafkaConsumer.State state) { - this.state = state; - } - - private ConsumerConnector fConnector; - private final String fTopic; - private final String fGroup; - private final String fId; - private final String fLogTag; - private final KafkaStream fStream; - private long fCreateTimeMs; - private long fLastTouch; - private long offset; - private KafkaConsumer.State state; - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); - //private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); -} diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java deleted file mode 100644 index 9bd67d1..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java +++ /dev/null @@ -1,618 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.kafka; - -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.I0Itec.zkclient.exception.ZkException; -import org.I0Itec.zkclient.exception.ZkInterruptedException; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.EnsurePath; -import org.apache.curator.utils.ZKPaths; -import org.apache.http.annotation.NotThreadSafe; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; - -//import org.slf4j.LoggerFactory; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.MetricsSet; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.drumlin.till.nv.rrNvReadable; - -/** - * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which - * must be - * @author author - * - */ -@NotThreadSafe -public class KafkaConsumerCache { - - private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs"; - private static final int kDefault_ConsumerHandoverWaitMs = 500; - - private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds"; - private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs"; - - private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath"; - private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache"; - - // kafka defaults to timing out a client after 6 seconds of inactivity, but - // it heartbeats even when the client isn't fetching. Here, we don't - // want to prematurely rebalance the consumer group. Assuming clients are - // hitting - // the server at least every 30 seconds, timing out after 2 minutes should - // be okay. - // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = (long)1000 * 60 * 2; - - // check for expirations pretty regularly - private static final long kDefault_SweepEverySeconds = 15; - - private enum Status { - NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED - } - - /** - * User defined exception class for kafka consumer cache - * - * @author author - * - */ - public class KafkaConsumerCacheException extends Exception { - /** - * To throw the exception - * - * @param t - */ - KafkaConsumerCacheException(Throwable t) { - super(t); - } - - /** - * - * @param s - */ - public KafkaConsumerCacheException(String s) { - super(s); - } - - private static final long serialVersionUID = 1L; - } - - /** - * Creates a KafkaConsumerCache object. Before it is used, you must call - * startCache() - * - * @param apiId - * @param s - * @param metrics - */ - public KafkaConsumerCache(String apiId, MetricsSet metrics) { - - if (apiId == null) { - throw new IllegalArgumentException("API Node ID must be specified."); - } - - fApiId = apiId; - // fSettings = s; - fMetrics = metrics; - String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath); - if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath; - fBaseZkPath = strkSetting_ZkBasePath; - - fConsumers = new ConcurrentHashMap(); - fSweepScheduler = Executors.newScheduledThreadPool(1); - - curatorConsumerCache = null; - - status = Status.NOT_STARTED; - - listener = new ConnectionStateListener() { - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if (newState == ConnectionState.LOST) { - log.info("ZooKeeper connection expired"); - handleConnectionLoss(); - } else if (newState == ConnectionState.READ_ONLY) { - log.warn("ZooKeeper connection set to read only mode."); - } else if (newState == ConnectionState.RECONNECTED) { - log.info("ZooKeeper connection re-established"); - handleReconnection(); - } else if (newState == ConnectionState.SUSPENDED) { - log.warn("ZooKeeper connection has been suspended."); - handleConnectionSuspended(); - } - } - }; - } - - /** - * Start the cache service. This must be called before any get/put - * operations. - * - * @param mode - * DMAAP or cambria - * @param curator - * @throws IOException - * @throws KafkaConsumerCacheException - */ - public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException { - try { - - // CuratorFramework curator = null; - - // Changed the class from where we are initializing the curator - // framework - if (mode != null && mode.equals(CambriaConstants.CAMBRIA)) { - curator = ConfigurationReader.getCurator(); - } else if (mode != null && mode.equals(CambriaConstants.DMAAP)) { - curator = getCuratorFramework(curator); - } - - curator.getConnectionStateListenable().addListener(listener); - - setStatus(Status.CONNECTED); - - curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true); - curatorConsumerCache.start(); - - curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() { - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: { - final String apiId = new String(event.getData().getData()); - final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); - - log.info(apiId + " started consumer " + consumer); - break; - } - case CHILD_UPDATED: { - final String apiId = new String(event.getData().getData()); - final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); - - if (fConsumers.containsKey(consumer)) { - log.info(apiId + " claimed consumer " + consumer + " from " + fApiId); - - dropClaimedConsumer(consumer); - } - - break; - } - case CHILD_REMOVED: { - final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); - - if (fConsumers.containsKey(consumer)) { - log.info("Someone wanted consumer " + consumer + " gone; removing it from the cache"); - dropConsumer(consumer, false); - } - - break; - } - default: - break; - } - } - }); - - // initialize the ZK path - EnsurePath ensurePath = new EnsurePath(fBaseZkPath); - ensurePath.ensure(curator.getZookeeperClient()); - - //final long freq = fSettings.getLong(kSetting_SweepEverySeconds, kDefault_SweepEverySeconds); - long freq = kDefault_SweepEverySeconds; - String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_SweepEverySeconds); - if(null==strkSetting_SweepEverySeconds) { - strkSetting_SweepEverySeconds = kDefault_SweepEverySeconds+""; - } - - freq = Long.parseLong(strkSetting_SweepEverySeconds); - - fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS); - log.info("KafkaConsumerCache started"); - log.info("sweeping cached clients every " + freq + " seconds"); - } catch (ZkException e) { - throw new KafkaConsumerCacheException(e); - } catch (Exception e) { - throw new KafkaConsumerCacheException(e); - } - } - - /** - * Getting the curator oject to start the zookeeper connection estabished - * - * @param curator - * @return curator object - */ - public static CuratorFramework getCuratorFramework(CuratorFramework curator) { - if (curator.getState() == CuratorFrameworkState.LATENT) { - curator.start(); - - try { - curator.blockUntilConnected(); - } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); - Thread.currentThread().interrupt(); - } - } - - return curator; - } - - /** - * Stop the cache service. - */ - public void stopCache() { - setStatus(Status.DISCONNECTED); - - final CuratorFramework curator = ConfigurationReader.getCurator(); - - if (curator != null) { - try { - curator.getConnectionStateListenable().removeListener(listener); - curatorConsumerCache.close(); - log.info("Curator client closed"); - } catch (ZkInterruptedException e) { - log.error("Curator client close interrupted: " + e); - } catch (IOException e) { - log.error("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e); - } - - curatorConsumerCache = null; - } - - if (fSweepScheduler != null) { - fSweepScheduler.shutdownNow(); - log.info("cache sweeper stopped"); - } - - if (fConsumers != null) { - fConsumers.clear(); - fConsumers = null; - } - - setStatus(Status.NOT_STARTED); - - log.info("Consumer cache service stopped"); - } - - /** - * Get a cached consumer by topic, group, and id, if it exists (and remains - * valid) In addition, this method waits for all other consumer caches in - * the cluster to release their ownership and delete their version of this - * consumer. - * - * @param topic - * @param consumerGroupId - * @param clientId - * @return a consumer, or null - */ - public KafkaConsumer getConsumerFor(String topic, String consumerGroupId, String clientId) - throws KafkaConsumerCacheException { - if (getStatus() != KafkaConsumerCache.Status.CONNECTED) - throw new KafkaConsumerCacheException("The cache service is unavailable."); - - final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId); - final KafkaConsumer kc = fConsumers.get(consumerKey); - - if (kc != null) { - log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch()); - kc.touch(); - fMetrics.onKafkaConsumerCacheHit(); - } else { - log.debug("Consumer cache miss for [" + consumerKey + "]"); - fMetrics.onKafkaConsumerCacheMiss(); - } - - return kc; - } - - /** - * Put a consumer into the cache by topic, group and ID - * - * @param topic - * @param consumerGroupId - * @param consumerId - * @param consumer - * @throws KafkaConsumerCacheException - */ - public void putConsumerFor(String topic, String consumerGroupId, String consumerId, KafkaConsumer consumer) - throws KafkaConsumerCacheException { - if (getStatus() != KafkaConsumerCache.Status.CONNECTED) - throw new KafkaConsumerCacheException("The cache service is unavailable."); - - final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - fConsumers.put(consumerKey, consumer); - } - - public Collection getConsumers() { - return new LinkedList(fConsumers.values()); - } - - /** - * This method is to drop all the consumer - */ - public void dropAllConsumers() { - for (Entry entry : fConsumers.entrySet()) { - dropConsumer(entry.getKey(), true); - } - - // consumers should be empty here - if (fConsumers.size() > 0) { - log.warn("During dropAllConsumers, the consumer map is not empty."); - fConsumers.clear(); - } - } - - /** - * Drop a consumer from our cache due to a timeout - * - * @param key - */ - private void dropTimedOutConsumer(String key) { - fMetrics.onKafkaConsumerTimeout(); - - if (!fConsumers.containsKey(key)) { - log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key); - return; - } - - // First, drop this consumer from our cache - dropConsumer(key, true); - - final CuratorFramework curator = ConfigurationReader.getCurator(); - - try { - curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key); - } catch (NoNodeException e) { - log.error("Exception at : " + e); - } catch (Exception e) { - log.error("Unexpected exception while deleting consumer: " + e); - } - - log.info("Dropped " + key + " consumer due to timeout"); - } - - /** - * Drop a consumer from our cache due to another API node claiming it as - * their own. - * - * @param key - */ - private void dropClaimedConsumer(String key) { - // if the consumer is still in our cache, it implies a claim. - if (fConsumers.containsKey(key)) { - fMetrics.onKafkaConsumerClaimed(); - log.info("Consumer [" + key + "] claimed by another node."); - } - - dropConsumer(key, false); - } - - /** - * Removes the consumer from the cache and closes its connection to the - * kafka broker(s). - * - * @param key - * @param dueToTimeout - */ - private void dropConsumer(String key, boolean dueToTimeout) { - final KafkaConsumer kc = fConsumers.remove(key); - - if (kc != null) { - log.info("closing Kafka consumer " + key); - kc.close(); - } - } - -// private final rrNvReadable fSettings; - private final MetricsSet fMetrics; - private final String fBaseZkPath; - private final ScheduledExecutorService fSweepScheduler; - private final String fApiId; - private final ConnectionStateListener listener; - - private ConcurrentHashMap fConsumers; - private PathChildrenCache curatorConsumerCache; - - private volatile Status status; - - private void handleReconnection() { - - log.info("Reading current cache data from ZK and synchronizing local cache"); - - final List cacheData = curatorConsumerCache.getCurrentData(); - - // Remove all the consumers in this API nodes cache that now belong to - // other API nodes. - for (ChildData cachedConsumer : cacheData) { - final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath()); - final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData()) - : "undefined"; - - if (!fApiId.equals(owningApiId)) { - fConsumers.remove(consumerId); - } - } - - setStatus(Status.CONNECTED); - } - - private void handleConnectionSuspended() { - log.info("Suspending cache until ZK connection is re-established"); - - setStatus(Status.SUSPENDED); - } - - private void handleConnectionLoss() { - log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)"); - - setStatus(Status.DISCONNECTED); - - closeAllCachedConsumers(); - fConsumers.clear(); - } - - private void closeAllCachedConsumers() { - for (Entry entry : fConsumers.entrySet()) { - entry.getValue().close(); - } - } - - private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) { - return topic + "::" + consumerGroupId + "::" + clientId; - } - - /** - * This method is to get a lock - * - * @param topic - * @param consumerGroupId - * @param consumerId - * @throws KafkaConsumerCacheException - */ - public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId) - throws KafkaConsumerCacheException { - // get a lock at /:::: - final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - - try { - final String consumerPath = fBaseZkPath + "/" + consumerKey; - - log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); - - final CuratorFramework curator = ConfigurationReader.getCurator(); - - try { - curator.setData().forPath(consumerPath, fApiId.getBytes()); - } catch (KeeperException.NoNodeException e) { - log.error(e.toString()); - curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes()); - } - - log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey); - } catch (Exception e) { - log.error(fApiId + " failed to claim ownership of consumer " + consumerKey); - throw new KafkaConsumerCacheException(e); - } - - log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer"); - - try { - int kSetting_ConsumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs; - String strkSetting_ConsumerHandoverWaitMs= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ConsumerHandoverWaitMs+""); - if(strkSetting_ConsumerHandoverWaitMs!=null) { - kSetting_ConsumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); - } - Thread.sleep(kSetting_ConsumerHandoverWaitMs); - //Thread.sleep(fSettings.getInt(kSetting_ConsumerHandoverWaitMs, kDefault_ConsumerHandoverWaitMs)); - } catch (InterruptedException e) { - log.error(e.toString()); - Thread.currentThread().interrupt(); - } - } - - private void sweep() { - final LinkedList removals = new LinkedList(); - long mustTouchEveryMs = kDefault_MustTouchEveryMs; - String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_TouchEveryMs); - //if(null!=strkSetting_TouchEveryMs) strkSetting_TouchEveryMs = kDefault_MustTouchEveryMs+""; - if(null!=strkSetting_TouchEveryMs) - { - mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); - } - - //final long mustTouchEveryMs = fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); - final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; - - for (Entry e : fConsumers.entrySet()) { - final long lastTouchMs = e.getValue().getLastTouch(); - - log.debug("consumer " + e.getKey() + " last touched at " + lastTouchMs); - - if (lastTouchMs < oldestAllowedTouchMs) { - log.info("consumer " + e.getKey() + " has expired"); - removals.add(e.getKey()); - } - } - - for (String key : removals) { - dropTimedOutConsumer(key); - } - } - - /** - * Creating a thread to run the sweep method - * - * @author author - * - */ - private class sweeper implements Runnable { - /** - * run method - */ - public void run() { - sweep(); - } - } - - /** - * This method is to drop consumer - * - * @param topic - * @param consumerGroup - * @param clientId - */ - public void dropConsumer(String topic, String consumerGroup, String clientId) { - dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false); - } - - private Status getStatus() { - return this.status; - } - - private void setStatus(Status status) { - this.status = status; - } - - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - //private static final Logger log = LoggerFactory.getLogger(KafkaConsumerCache.class); -} \ No newline at end of file diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java deleted file mode 100644 index 42a6bb9..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java +++ /dev/null @@ -1,168 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.kafka; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import kafka.common.FailedToSendMessageException; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -import org.json.JSONException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.backends.Publisher; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.drumlin.till.nv.rrNvReadable; - -/** - * Sends raw JSON objects into Kafka. - * - * Could improve space: BSON rather than JSON? - * - * @author author - * - */ - -public class KafkaPublisher implements Publisher { - /** - * constructor initializing - * - * @param settings - * @throws rrNvReadable.missingReqdSetting - */ - public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting { - //fSettings = settings; - - final Properties props = new Properties(); - /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092"); - transferSetting(fSettings, props, "request.required.acks", "1"); - transferSetting(fSettings, props, "message.send.max.retries", "5"); - transferSetting(fSettings, props, "retry.backoff.ms", "150"); */ - String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); - System.out.println("kafkaConnUrl:- "+kafkaConnUrl); - if(null==kafkaConnUrl){ - - kafkaConnUrl="localhost:9092"; - } - transferSetting( props, "metadata.broker.list", kafkaConnUrl); - transferSetting( props, "request.required.acks", "1"); - transferSetting( props, "message.send.max.retries", "5"); - transferSetting(props, "retry.backoff.ms", "150"); - - props.put("serializer.class", "kafka.serializer.StringEncoder"); - - fConfig = new ProducerConfig(props); - fProducer = new Producer(fConfig); - } - - /** - * Send a message with a given topic and key. - * - * @param msg - * @throws FailedToSendMessageException - * @throws JSONException - */ - @Override - public void sendMessage(String topic, message msg) throws IOException, FailedToSendMessageException { - final List msgs = new LinkedList(); - msgs.add(msg); - sendMessages(topic, msgs); - } - - /** - * method publishing batch messages - * - * @param topic - * @param kms - * throws IOException - */ - public void sendBatchMessage(String topic, ArrayList> kms) throws IOException { - try { - fProducer.send(kms); - - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); - } - - } - - /** - * Send a set of messages. Each must have a "key" string value. - * - * @param topic - * @param msg - * @throws FailedToSendMessageException - * @throws JSONException - */ - @Override - public void sendMessages(String topic, List msgs) - throws IOException, FailedToSendMessageException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); - - final List> kms = new ArrayList>(msgs.size()); - for (message o : msgs) { - final KeyedMessage data = new KeyedMessage(topic, o.getKey(), o.toString()); - kms.add(data); - } - try { - fProducer.send(kms); - - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); - } - } - - //private final rrNvReadable fSettings; - - private ProducerConfig fConfig; - private Producer fProducer; - - /** - * It sets the key value pair - * @param topic - * @param msg - * @param key - * @param defVal - */ - private void transferSetting(Properties props, String key, String defVal) { - String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (null==kafka_prop) kafka_prop=defVal; - //props.put(key, settings.getString("kafka." + key, defVal)); - props.put(key, kafka_prop); - } - - //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java deleted file mode 100644 index f0982a9..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java +++ /dev/null @@ -1,160 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.util.ArrayList; -import java.util.Collection; - -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.ConsumerFactory; -/** - * - * @author author - * - */ -public class MemoryConsumerFactory implements ConsumerFactory -{ - /** - * - * Initializing constructor - * @param q - */ - public MemoryConsumerFactory ( MemoryQueue q ) - { - fQueue = q; - } - - /** - * - * @param topic - * @param consumerGroupId - * @param clientId - * @param timeoutMs - * @return Consumer - */ - @Override - public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs ) - { - return new MemoryConsumer ( topic, consumerGroupId ); - } - - private final MemoryQueue fQueue; - - /** - * - * Define nested inner class - * - */ - private class MemoryConsumer implements Consumer - { - /** - * - * Initializing MemoryConsumer constructor - * @param topic - * @param consumer - * - */ - public MemoryConsumer ( String topic, String consumer ) - { - fTopic = topic; - fConsumer = consumer; - fCreateMs = System.currentTimeMillis (); - fLastAccessMs = fCreateMs; - } - - @Override - /** - * - * return consumer details - */ - public Message nextMessage () - { - return fQueue.get ( fTopic, fConsumer ); - } - - private final String fTopic; - private final String fConsumer; - private final long fCreateMs; - private long fLastAccessMs; - - @Override - public void close() { - //Nothing to close/clean up. - } - /** - * - */ - public void commitOffsets() - { - // ignoring this aspect - } - /** - * get offset - */ - public long getOffset() - { - return 0; - } - - @Override - /** - * get consumer topic name - */ - public String getName () - { - return fTopic + "/" + fConsumer; - } - - @Override - public long getCreateTimeMs () - { - return fCreateMs; - } - - @Override - public long getLastAccessMs () - { - return fLastAccessMs; - } - } - - @Override - public void destroyConsumer(String topic, String consumerGroupId, - String clientId) { - //No cache for memory consumers, so NOOP - } - - @Override - public void dropCache () - { - // nothing to do - there's no cache here - } - - @Override - /** - * @return ArrayList - */ - public Collection getConsumers () - { - return new ArrayList (); - } -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java deleted file mode 100644 index 87e59c2..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java +++ /dev/null @@ -1,199 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import com.att.nsa.cambria.metabroker.Broker; -import com.att.nsa.cambria.metabroker.Topic; -import com.att.nsa.configs.ConfigDb; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.security.NsaAcl; -import com.att.nsa.security.NsaApiKey; - -/** - * - * @author author - * - */ -public class MemoryMetaBroker implements Broker { - /** - * - * @param mq - * @param configDb - * @param settings - */ - public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { - //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { - fQueue = mq; - fTopics = new HashMap(); - } - - @Override - public List getAllTopics() { - return new LinkedList(fTopics.values()); - } - - @Override - public Topic getTopic(String topic) { - return fTopics.get(topic); - } - - @Override - public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException { - if (getTopic(topic) != null) { - throw new TopicExistsException(topic); - } - fQueue.createTopic(topic); - fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled)); - return getTopic(topic); - } - - @Override - public void deleteTopic(String topic) { - fTopics.remove(topic); - fQueue.removeTopic(topic); - } - - private final MemoryQueue fQueue; - private final HashMap fTopics; - - private static class MemTopic implements Topic { - /** - * constructor initialization - * - * @param name - * @param desc - * @param owner - * @param transactionEnabled - */ - public MemTopic(String name, String desc, String owner, boolean transactionEnabled) { - fName = name; - fDesc = desc; - fOwner = owner; - ftransactionEnabled = transactionEnabled; - fReaders = null; - fWriters = null; - } - - @Override - public String getOwner() { - return fOwner; - } - - @Override - public NsaAcl getReaderAcl() { - return fReaders; - } - - @Override - public NsaAcl getWriterAcl() { - return fWriters; - } - - @Override - public void checkUserRead(NsaApiKey user) throws AccessDeniedException { - if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) { - throw new AccessDeniedException(user == null ? "" : user.getKey()); - } - } - - @Override - public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { - if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) { - throw new AccessDeniedException(user == null ? "" : user.getKey()); - } - } - - @Override - public String getName() { - return fName; - } - - @Override - public String getDescription() { - return fDesc; - } - - @Override - public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { - if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); - } - if (fWriters == null) { - fWriters = new NsaAcl(); - } - fWriters.add(publisherId); - } - - @Override - public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { - if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); - } - fWriters.remove(publisherId); - } - - @Override - public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { - if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); - } - if (fReaders == null) { - fReaders = new NsaAcl(); - } - fReaders.add(consumerId); - } - - @Override - public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { - if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); - } - fReaders.remove(consumerId); - } - - private final String fName; - private final String fDesc; - private final String fOwner; - private NsaAcl fReaders; - private NsaAcl fWriters; - private boolean ftransactionEnabled; - - @Override - public boolean isTransactionEnabled() { - return ftransactionEnabled; - } - - @Override - public Set getOwners() { - final TreeSet set = new TreeSet (); - set.add ( fOwner ); - return set; - } - } -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java deleted file mode 100644 index a0dc8b8..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java +++ /dev/null @@ -1,207 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.util.ArrayList; -import java.util.HashMap; - -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.Publisher.message; - -/** - * When broker type is memory, then this class is doing all the topic related - * operations - * - * @author author - * - */ -public class MemoryQueue { - // map from topic to list of msgs - private HashMap fQueue; - private HashMap> fOffsets; - - /** - * constructor storing hashMap objects in Queue and Offsets object - */ - public MemoryQueue() { - fQueue = new HashMap(); - fOffsets = new HashMap>(); - } - - /** - * method used to create topic - * - * @param topic - */ - public synchronized void createTopic(String topic) { - LogBuffer q = fQueue.get(topic); - if (q == null) { - q = new LogBuffer(1024 * 1024); - fQueue.put(topic, q); - } - } - - /** - * method used to remove topic - * - * @param topic - */ - public synchronized void removeTopic(String topic) { - LogBuffer q = fQueue.get(topic); - if (q != null) { - fQueue.remove(topic); - } - } - - /** - * method to write message on topic - * - * @param topic - * @param m - */ - public synchronized void put(String topic, message m) { - LogBuffer q = fQueue.get(topic); - if (q == null) { - createTopic(topic); - q = fQueue.get(topic); - } - q.push(m.getMessage()); - } - - /** - * method to read consumer messages - * - * @param topic - * @param consumerName - * @return - */ - public synchronized Consumer.Message get(String topic, String consumerName) { - final LogBuffer q = fQueue.get(topic); - if (q == null) { - return null; - } - - HashMap offsetMap = fOffsets.get(consumerName); - if (offsetMap == null) { - offsetMap = new HashMap(); - fOffsets.put(consumerName, offsetMap); - } - Integer offset = offsetMap.get(topic); - if (offset == null) { - offset = 0; - } - - final msgInfo result = q.read(offset); - if (result != null && result.msg != null) { - offsetMap.put(topic, result.offset + 1); - } - return result; - } - - /** - * static inner class used to details about consumed messages - * - * @author author - * - */ - private static class msgInfo implements Consumer.Message { - /** - * published message which is consumed - */ - public String msg; - /** - * offset associated with message - */ - public int offset; - - /** - * get offset of messages - */ - @Override - public long getOffset() { - return offset; - } - - /** - * get consumed message - */ - @Override - public String getMessage() { - return msg; - } - } - - /** - * - * @author author - * - * private LogBuffer class has synchronized push and read method - */ - private class LogBuffer { - private int fBaseOffset; - private final int fMaxSize; - private final ArrayList fList; - - /** - * constructor initializing the offset, maxsize and list - * - * @param maxSize - */ - public LogBuffer(int maxSize) { - fBaseOffset = 0; - fMaxSize = maxSize; - fList = new ArrayList(); - } - - /** - * pushing message - * - * @param msg - */ - public synchronized void push(String msg) { - fList.add(msg); - while (fList.size() > fMaxSize) { - fList.remove(0); - fBaseOffset++; - } - } - - /** - * reading messages - * - * @param offset - * @return - */ - public synchronized msgInfo read(int offset) { - final int actual = Math.max(0, offset - fBaseOffset); - - final msgInfo mi = new msgInfo(); - mi.msg = (actual >= fList.size()) ? null : fList.get(actual); - if (mi.msg == null) - return null; - - mi.offset = actual + fBaseOffset; - return mi; - } - - } -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java deleted file mode 100644 index d653f6e..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java +++ /dev/null @@ -1,90 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.att.nsa.cambria.backends.Publisher; -import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; - -import kafka.producer.KeyedMessage; - -/** - * - * @author author - * - */ -public class MemoryQueuePublisher implements Publisher { - /** - * - * @param q - * @param b - */ - public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) { - fBroker = b; - fQueue = q; - } - - /** - * sendBatchMessages - * - * @param topic - * @param kms - */ - public void sendBatchMessage(String topic, ArrayList> kms) throws IOException { - } - - /** - * - * @param topic - * @param msg - * @throws IOException - */ - @Override - public void sendMessage(String topic, message msg) throws IOException { - if (null == fBroker.getTopic(topic)) { - try { - fBroker.createTopic(topic, topic, null, 8, 3, false); - } catch (TopicExistsException e) { - throw new RuntimeException(e); - } - } - fQueue.put(topic, msg); - } - - @Override - /** - * @param topic - * @param msgs - * @throws IOException - */ - public void sendMessages(String topic, List msgs) throws IOException { - for (message m : msgs) { - sendMessage(topic, m); - } - } - - private final MemoryMetaBroker fBroker; - private final MemoryQueue fQueue; -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java deleted file mode 100644 index c49ac4f..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java +++ /dev/null @@ -1,61 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.att.nsa.cambria.backends.Publisher; - -import kafka.producer.KeyedMessage; - -/** - * class is used to message publishing - * - * @author author - * - */ -public class MessageDropper implements Publisher { - /** - * publish single messages - * param topic - * param msg - */ - @Override - public void sendMessage(String topic, message msg) throws IOException { - } - - /** - * publish multiple messages - */ - @Override - public void sendMessages(String topic, List msgs) throws IOException { - } - - /** - * publish batch messages - */ - @Override - public void sendBatchMessage(String topic, ArrayList> kms) throws IOException { - } -} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java deleted file mode 100644 index 9ff8bd6..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java +++ /dev/null @@ -1,101 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.memory; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.att.nsa.cambria.backends.Publisher; - -import kafka.producer.KeyedMessage; - -/** - * class used for logging perspective - * - * @author author - * - */ -public class MessageLogger implements Publisher { - public MessageLogger() { - } - - public void setFile(File f) throws FileNotFoundException { - fStream = new FileOutputStream(f, true); - } - - /** - * - * @param topic - * @param msg - * @throws IOException - */ - @Override - public void sendMessage(String topic, message msg) throws IOException { - logMsg(msg); - } - - /** - * @param topic - * @param msgs - * @throws IOException - */ - @Override - public void sendMessages(String topic, List msgs) throws IOException { - for (message m : msgs) { - logMsg(m); - } - } - - /** - * @param topic - * @param kms - * @throws IOException - */ - @Override - public void sendBatchMessage(String topic, ArrayList> kms) throws - - IOException { - } - - private FileOutputStream fStream; - - /** - * - * @param msg - * @throws IOException - */ - private void logMsg(message msg) throws IOException { - String key = msg.getKey(); - if (key == null) - key = ""; - - fStream.write('['); - fStream.write(key.getBytes()); - fStream.write("] ".getBytes()); - fStream.write(msg.getMessage().getBytes()); - fStream.write('\n'); - } -} -- cgit 1.2.3-korg