summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/cambria/backends
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/backends')
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/Consumer.java96
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java110
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/MetricsSet.java71
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/Publisher.java98
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java245
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java618
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java168
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java160
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java199
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java207
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java90
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java61
-rw-r--r--src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java101
13 files changed, 0 insertions, 2224 deletions
diff --git a/src/main/java/com/att/nsa/cambria/backends/Consumer.java b/src/main/java/com/att/nsa/cambria/backends/Consumer.java
deleted file mode 100644
index d4946ba..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/Consumer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends;
-
-/**
- * A consumer interface. Consumers pull the next message from a given topic.
- * @author author
- */
-public interface Consumer
-{
- /**
- * A message interface provide the offset and message
- * @author author
- *
- */
- public interface Message
- {
- /**
- * returning the offset of that particular message
- * @return long
- */
- long getOffset ();
- /**
- * returning the message
- * @return message
- */
- String getMessage ();
- }
-
- /**
- * Get this consumer's name
- * @return name
- */
- String getName ();
-
- /**
- * Get creation time in ms
- * @return
- */
- long getCreateTimeMs ();
-
- /**
- * Get last access time in ms
- * @return
- */
- long getLastAccessMs ();
-
- /**
- * Get the next message from this source. This method must not block.
- * @return the next message, or null if none are waiting
- */
- Message nextMessage ();
-
- /**
- * Get the next message from this source. This method must not block.
- * @param atOffset start with the next message at or after atOffset. -1 means next from last request
- * @return the next message, or null if none are waiting
- */
-
-
- /**
- * Close/clean up this consumer
- */
- void close();
-
- /**
- * Commit the offset of the last consumed message
- *
- */
- void commitOffsets();
-
- /**
- * Get the offset this consumer is currently at
- * @return offset
- */
- long getOffset();
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java b/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java
deleted file mode 100644
index 1597c07..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/ConsumerFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends;
-
-import java.util.Collection;
-
-/**
- * This is the factory class to instantiate the consumer
- *
- * @author author
- *
- */
-
-public interface ConsumerFactory {
- public static final String kSetting_EnableCache = "cambria.consumer.cache.enabled";
- public static boolean kDefault_IsCacheEnabled = true;
-
- /**
- * User defined exception for Unavailable Exception
- *
- * @author author
- *
- */
- public class UnavailableException extends Exception {
- /**
- * Unavailable Exception with message
- *
- * @param msg
- */
- public UnavailableException(String msg) {
- super(msg);
- }
-
- /**
- * Unavailable Exception with the throwable object
- *
- * @param t
- */
- public UnavailableException(Throwable t) {
- super(t);
- }
-
- /**
- * Unavailable Exception with the message and cause
- *
- * @param msg
- * @param cause
- */
- public UnavailableException(String msg, Throwable cause) {
- super(msg, cause);
- }
-
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * For admin use, drop all cached consumers.
- */
- public void dropCache();
-
- /**
- * Get or create a consumer for the given set of info (topic, group, id)
- *
- * @param topic
- * @param consumerGroupId
- * @param clientId
- * @param timeoutMs
- * @return
- * @throws UnavailableException
- */
- public Consumer getConsumerFor(String topic, String consumerGroupId,
- String clientId, int timeoutMs) throws UnavailableException;
-
- /**
- * For factories that employ a caching mechanism, this allows callers to
- * explicitly destory a consumer that resides in the factory's cache.
- *
- * @param topic
- * @param consumerGroupId
- * @param clientId
- */
- public void destroyConsumer(String topic, String consumerGroupId,
- String clientId);
-
- /**
- * For admin/debug, we provide access to the consumers
- *
- * @return a collection of consumers
- */
- public Collection<? extends Consumer> getConsumers();
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java b/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java
deleted file mode 100644
index ce104ac..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/MetricsSet.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends;
-
-import com.att.nsa.metrics.CdmMetricsRegistry;
-/**
- * This interface will help to generate metrics
- * @author author
- *
- */
-public interface MetricsSet extends CdmMetricsRegistry{
-
- /**
- * This method will setup cambria sender code
- */
- public void setupCambriaSender ();
- /**
- * This method will define on route complete
- * @param name
- * @param durationMs
- */
- public void onRouteComplete ( String name, long durationMs );
- /**
- * This method will help the kafka publisher while publishing the messages
- * @param amount
- */
- public void publishTick ( int amount );
- /**
- * This method will help the kafka consumer while consuming the messages
- * @param amount
- */
- public void consumeTick ( int amount );
- /**
- * This method will call if the kafka consumer cache missed
- */
- public void onKafkaConsumerCacheMiss ();
- /**
- * This method will call if the kafka consumer cache will be hit while publishing/consuming the messages
- */
- public void onKafkaConsumerCacheHit ();
- /**
- * This method will call if the kafka consumer cache claimed
- */
- public void onKafkaConsumerClaimed ();
- /**
- * This method will call if Kafka consumer is timed out
- */
- public void onKafkaConsumerTimeout ();
-
-
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/Publisher.java b/src/main/java/com/att/nsa/cambria/backends/Publisher.java
deleted file mode 100644
index 696e78f..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/Publisher.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.producer.KeyedMessage;
-
-import com.att.nsa.cambria.beans.LogDetails;
-
-/**
- * A publisher interface. Publishers receive messages and post them to a topic.
- * @author author
- */
-public interface Publisher
-{
- /**
- * A message interface. The message has a key and a body.
- * @author author
- */
- public interface message
- {
- /**
- * Get the key for this message. The key is used to partition messages
- * into "sub-streams" that have guaranteed order. The key can be null,
- * which means the message can be processed without any concern for order.
- *
- * @return a key, possibly null
- */
- String getKey();
-
- /**
- * Get the message body.
- * @return a message body
- */
- String getMessage();
- /**
- * set the logging params for transaction enabled logging
- * @param logDetails
- */
- void setLogDetails (LogDetails logDetails);
- /**
- * Get the log details for transaction enabled logging
- * @return LogDetails
- */
- LogDetails getLogDetails ();
-
- /**
- * boolean transactionEnabled
- * @return true/false
- */
- boolean isTransactionEnabled();
- /**
- * Set the transaction enabled flag from prop file or topic based implementation
- * @param transactionEnabled
- */
- void setTransactionEnabled(boolean transactionEnabled);
- }
-
- /**
- * Send a single message to a topic. Equivalent to sendMessages with a list of size 1.
- * @param topic
- * @param msg
- * @throws IOException
- */
- public void sendMessage ( String topic, message msg ) throws IOException;
-
- /**
- * Send messages to a topic.
- * @param topic
- * @param msgs
- * @throws IOException
- */
- public void sendMessages ( String topic, List<? extends message> msgs ) throws IOException;
-
- public void sendBatchMessage(String topic ,ArrayList<KeyedMessage<String,String>> kms) throws IOException;
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java
deleted file mode 100644
index 692f093..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import com.att.nsa.cambria.backends.Consumer;
-
-/**
- * A consumer instance that's created per-request. These are stateless so that
- * clients can connect to this service as a proxy.
- *
- * @author author
- *
- */
-public class KafkaConsumer implements Consumer {
- private enum State {
- OPENED, CLOSED
- }
-
- /**
- * KafkaConsumer() is constructor. It has following 4 parameters:-
- * @param topic
- * @param group
- * @param id
- * @param cc
- *
- */
-
- public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) {
- fTopic = topic;
- fGroup = group;
- fId = id;
- fConnector = cc;
-
- fCreateTimeMs = System.currentTimeMillis();
- fLastTouch = fCreateTimeMs;
-
- fLogTag = fGroup + "(" + fId + ")/" + fTopic;
- offset = 0;
-
- state = KafkaConsumer.State.OPENED;
-
- final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(fTopic, 1);
- final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector
- .createMessageStreams(topicCountMap);
- final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic);
- fStream = streams.iterator().next();
- }
-
-
- /** getName() method returns string type value.
- * returns 3 parameters in string:-
- * fTopic,fGroup,fId
- * @Override
- */
- public String getName() {
- return fTopic + " : " + fGroup + " : " + fId;
- }
-
- /** getCreateTimeMs() method returns long type value.
- * returns fCreateTimeMs variable value
- * @Override
- *
- */
- public long getCreateTimeMs() {
- return fCreateTimeMs;
- }
-
- /** getLastAccessMs() method returns long type value.
- * returns fLastTouch variable value
- * @Override
- *
- */
- public long getLastAccessMs() {
- return fLastTouch;
- }
-
-
- /**
- * nextMessage() is synchronized method that means at a time only one object can access it.
- * getName() method returns String which is of type Consumer.Message
- * @Override
- * */
- public synchronized Consumer.Message nextMessage() {
- if (getState() == KafkaConsumer.State.CLOSED) {
- log.warn("nextMessage() called on closed KafkaConsumer " + getName());
- return null;
- }
-
- try {
- ConsumerIterator<byte[], byte[]> it = fStream.iterator();
- if (it.hasNext()) {
- final MessageAndMetadata<byte[], byte[]> msg = it.next();
- offset = msg.offset();
-
- return new Consumer.Message() {
- @Override
- public long getOffset() {
- return msg.offset();
- }
-
- @Override
- public String getMessage() {
- return new String(msg.message());
- }
- };
- }
- } catch (kafka.consumer.ConsumerTimeoutException x) {
- log.error(fLogTag + ": ConsumerTimeoutException in Kafka consumer; returning null & Exception at nextMessage() : " + x);
- } catch (java.lang.IllegalStateException x) {
- log.error(fLogTag + ": Error found next() at : " + x);
- }
-
- return null;
- }
-
- /** getOffset() method returns long type value.
- * returns offset variable value
- * @Override
- *
- */
- public long getOffset() {
- return offset;
- }
-
- /** commit offsets
- * commitOffsets() method will be called on closed of KafkaConsumer.
- * @Override
- *
- */
- public void commitOffsets() {
- if (getState() == KafkaConsumer.State.CLOSED) {
- log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
- return;
- }
- fConnector.commitOffsets();
- }
-
- /**
- * updating fLastTouch with current time in ms
- */
- public void touch() {
- fLastTouch = System.currentTimeMillis();
- }
-
- /** getLastTouch() method returns long type value.
- * returns fLastTouch variable value
- *
- */
- public long getLastTouch() {
- return fLastTouch;
- }
-
- /**
- * setting the kafkaConsumer state to closed
- */
- public synchronized void close() {
- if (getState() == KafkaConsumer.State.CLOSED) {
- log.warn("close() called on closed KafkaConsumer " + getName());
- return;
- }
-
- setState(KafkaConsumer.State.CLOSED);
- fConnector.shutdown();
- }
-
- /**
- * getConsumerGroup() returns Consumer group
- * @return
- */
- public String getConsumerGroup() {
- return fGroup;
- }
-
- /**
- * getConsumerId returns Consumer Id
- * @return
- */
- public String getConsumerId() {
- return fId;
- }
-
- /**
- * getState returns kafkaconsumer state
- * @return
- */
- private KafkaConsumer.State getState() {
- return this.state;
- }
-
- /**
- * setState() sets the kafkaConsumer state
- * @param state
- */
- private void setState(KafkaConsumer.State state) {
- this.state = state;
- }
-
- private ConsumerConnector fConnector;
- private final String fTopic;
- private final String fGroup;
- private final String fId;
- private final String fLogTag;
- private final KafkaStream<byte[], byte[]> fStream;
- private long fCreateTimeMs;
- private long fLastTouch;
- private long offset;
- private KafkaConsumer.State state;
- private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
- //private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java
deleted file mode 100644
index 9bd67d1..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumerCache.java
+++ /dev/null
@@ -1,618 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.kafka;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.http.annotation.NotThreadSafe;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-
-//import org.slf4j.LoggerFactory;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.nsa.cambria.backends.Consumer;
-import com.att.nsa.cambria.backends.MetricsSet;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-
-/**
- * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
- * must be
- * @author author
- *
- */
-@NotThreadSafe
-public class KafkaConsumerCache {
-
- private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
- private static final int kDefault_ConsumerHandoverWaitMs = 500;
-
- private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
- private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
-
- private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
- private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
-
- // kafka defaults to timing out a client after 6 seconds of inactivity, but
- // it heartbeats even when the client isn't fetching. Here, we don't
- // want to prematurely rebalance the consumer group. Assuming clients are
- // hitting
- // the server at least every 30 seconds, timing out after 2 minutes should
- // be okay.
- // FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = (long)1000 * 60 * 2;
-
- // check for expirations pretty regularly
- private static final long kDefault_SweepEverySeconds = 15;
-
- private enum Status {
- NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
- }
-
- /**
- * User defined exception class for kafka consumer cache
- *
- * @author author
- *
- */
- public class KafkaConsumerCacheException extends Exception {
- /**
- * To throw the exception
- *
- * @param t
- */
- KafkaConsumerCacheException(Throwable t) {
- super(t);
- }
-
- /**
- *
- * @param s
- */
- public KafkaConsumerCacheException(String s) {
- super(s);
- }
-
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * Creates a KafkaConsumerCache object. Before it is used, you must call
- * startCache()
- *
- * @param apiId
- * @param s
- * @param metrics
- */
- public KafkaConsumerCache(String apiId, MetricsSet metrics) {
-
- if (apiId == null) {
- throw new IllegalArgumentException("API Node ID must be specified.");
- }
-
- fApiId = apiId;
- // fSettings = s;
- fMetrics = metrics;
- String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath);
- if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath;
- fBaseZkPath = strkSetting_ZkBasePath;
-
- fConsumers = new ConcurrentHashMap<String, KafkaConsumer>();
- fSweepScheduler = Executors.newScheduledThreadPool(1);
-
- curatorConsumerCache = null;
-
- status = Status.NOT_STARTED;
-
- listener = new ConnectionStateListener() {
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
- log.info("ZooKeeper connection expired");
- handleConnectionLoss();
- } else if (newState == ConnectionState.READ_ONLY) {
- log.warn("ZooKeeper connection set to read only mode.");
- } else if (newState == ConnectionState.RECONNECTED) {
- log.info("ZooKeeper connection re-established");
- handleReconnection();
- } else if (newState == ConnectionState.SUSPENDED) {
- log.warn("ZooKeeper connection has been suspended.");
- handleConnectionSuspended();
- }
- }
- };
- }
-
- /**
- * Start the cache service. This must be called before any get/put
- * operations.
- *
- * @param mode
- * DMAAP or cambria
- * @param curator
- * @throws IOException
- * @throws KafkaConsumerCacheException
- */
- public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
- try {
-
- // CuratorFramework curator = null;
-
- // Changed the class from where we are initializing the curator
- // framework
- if (mode != null && mode.equals(CambriaConstants.CAMBRIA)) {
- curator = ConfigurationReader.getCurator();
- } else if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
- curator = getCuratorFramework(curator);
- }
-
- curator.getConnectionStateListenable().addListener(listener);
-
- setStatus(Status.CONNECTED);
-
- curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
- curatorConsumerCache.start();
-
- curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- switch (event.getType()) {
- case CHILD_ADDED: {
- final String apiId = new String(event.getData().getData());
- final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
-
- log.info(apiId + " started consumer " + consumer);
- break;
- }
- case CHILD_UPDATED: {
- final String apiId = new String(event.getData().getData());
- final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
-
- if (fConsumers.containsKey(consumer)) {
- log.info(apiId + " claimed consumer " + consumer + " from " + fApiId);
-
- dropClaimedConsumer(consumer);
- }
-
- break;
- }
- case CHILD_REMOVED: {
- final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
-
- if (fConsumers.containsKey(consumer)) {
- log.info("Someone wanted consumer " + consumer + " gone; removing it from the cache");
- dropConsumer(consumer, false);
- }
-
- break;
- }
- default:
- break;
- }
- }
- });
-
- // initialize the ZK path
- EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
- ensurePath.ensure(curator.getZookeeperClient());
-
- //final long freq = fSettings.getLong(kSetting_SweepEverySeconds, kDefault_SweepEverySeconds);
- long freq = kDefault_SweepEverySeconds;
- String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_SweepEverySeconds);
- if(null==strkSetting_SweepEverySeconds) {
- strkSetting_SweepEverySeconds = kDefault_SweepEverySeconds+"";
- }
-
- freq = Long.parseLong(strkSetting_SweepEverySeconds);
-
- fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
- log.info("KafkaConsumerCache started");
- log.info("sweeping cached clients every " + freq + " seconds");
- } catch (ZkException e) {
- throw new KafkaConsumerCacheException(e);
- } catch (Exception e) {
- throw new KafkaConsumerCacheException(e);
- }
- }
-
- /**
- * Getting the curator oject to start the zookeeper connection estabished
- *
- * @param curator
- * @return curator object
- */
- public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
- if (curator.getState() == CuratorFrameworkState.LATENT) {
- curator.start();
-
- try {
- curator.blockUntilConnected();
- } catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
- Thread.currentThread().interrupt();
- }
- }
-
- return curator;
- }
-
- /**
- * Stop the cache service.
- */
- public void stopCache() {
- setStatus(Status.DISCONNECTED);
-
- final CuratorFramework curator = ConfigurationReader.getCurator();
-
- if (curator != null) {
- try {
- curator.getConnectionStateListenable().removeListener(listener);
- curatorConsumerCache.close();
- log.info("Curator client closed");
- } catch (ZkInterruptedException e) {
- log.error("Curator client close interrupted: " + e);
- } catch (IOException e) {
- log.error("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e);
- }
-
- curatorConsumerCache = null;
- }
-
- if (fSweepScheduler != null) {
- fSweepScheduler.shutdownNow();
- log.info("cache sweeper stopped");
- }
-
- if (fConsumers != null) {
- fConsumers.clear();
- fConsumers = null;
- }
-
- setStatus(Status.NOT_STARTED);
-
- log.info("Consumer cache service stopped");
- }
-
- /**
- * Get a cached consumer by topic, group, and id, if it exists (and remains
- * valid) In addition, this method waits for all other consumer caches in
- * the cluster to release their ownership and delete their version of this
- * consumer.
- *
- * @param topic
- * @param consumerGroupId
- * @param clientId
- * @return a consumer, or null
- */
- public KafkaConsumer getConsumerFor(String topic, String consumerGroupId, String clientId)
- throws KafkaConsumerCacheException {
- if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
- throw new KafkaConsumerCacheException("The cache service is unavailable.");
-
- final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
- final KafkaConsumer kc = fConsumers.get(consumerKey);
-
- if (kc != null) {
- log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
- kc.touch();
- fMetrics.onKafkaConsumerCacheHit();
- } else {
- log.debug("Consumer cache miss for [" + consumerKey + "]");
- fMetrics.onKafkaConsumerCacheMiss();
- }
-
- return kc;
- }
-
- /**
- * Put a consumer into the cache by topic, group and ID
- *
- * @param topic
- * @param consumerGroupId
- * @param consumerId
- * @param consumer
- * @throws KafkaConsumerCacheException
- */
- public void putConsumerFor(String topic, String consumerGroupId, String consumerId, KafkaConsumer consumer)
- throws KafkaConsumerCacheException {
- if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
- throw new KafkaConsumerCacheException("The cache service is unavailable.");
-
- final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- fConsumers.put(consumerKey, consumer);
- }
-
- public Collection<? extends Consumer> getConsumers() {
- return new LinkedList<KafkaConsumer>(fConsumers.values());
- }
-
- /**
- * This method is to drop all the consumer
- */
- public void dropAllConsumers() {
- for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
- dropConsumer(entry.getKey(), true);
- }
-
- // consumers should be empty here
- if (fConsumers.size() > 0) {
- log.warn("During dropAllConsumers, the consumer map is not empty.");
- fConsumers.clear();
- }
- }
-
- /**
- * Drop a consumer from our cache due to a timeout
- *
- * @param key
- */
- private void dropTimedOutConsumer(String key) {
- fMetrics.onKafkaConsumerTimeout();
-
- if (!fConsumers.containsKey(key)) {
- log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
- return;
- }
-
- // First, drop this consumer from our cache
- dropConsumer(key, true);
-
- final CuratorFramework curator = ConfigurationReader.getCurator();
-
- try {
- curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
- } catch (NoNodeException e) {
- log.error("Exception at : " + e);
- } catch (Exception e) {
- log.error("Unexpected exception while deleting consumer: " + e);
- }
-
- log.info("Dropped " + key + " consumer due to timeout");
- }
-
- /**
- * Drop a consumer from our cache due to another API node claiming it as
- * their own.
- *
- * @param key
- */
- private void dropClaimedConsumer(String key) {
- // if the consumer is still in our cache, it implies a claim.
- if (fConsumers.containsKey(key)) {
- fMetrics.onKafkaConsumerClaimed();
- log.info("Consumer [" + key + "] claimed by another node.");
- }
-
- dropConsumer(key, false);
- }
-
- /**
- * Removes the consumer from the cache and closes its connection to the
- * kafka broker(s).
- *
- * @param key
- * @param dueToTimeout
- */
- private void dropConsumer(String key, boolean dueToTimeout) {
- final KafkaConsumer kc = fConsumers.remove(key);
-
- if (kc != null) {
- log.info("closing Kafka consumer " + key);
- kc.close();
- }
- }
-
-// private final rrNvReadable fSettings;
- private final MetricsSet fMetrics;
- private final String fBaseZkPath;
- private final ScheduledExecutorService fSweepScheduler;
- private final String fApiId;
- private final ConnectionStateListener listener;
-
- private ConcurrentHashMap<String, KafkaConsumer> fConsumers;
- private PathChildrenCache curatorConsumerCache;
-
- private volatile Status status;
-
- private void handleReconnection() {
-
- log.info("Reading current cache data from ZK and synchronizing local cache");
-
- final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
-
- // Remove all the consumers in this API nodes cache that now belong to
- // other API nodes.
- for (ChildData cachedConsumer : cacheData) {
- final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
- final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
- : "undefined";
-
- if (!fApiId.equals(owningApiId)) {
- fConsumers.remove(consumerId);
- }
- }
-
- setStatus(Status.CONNECTED);
- }
-
- private void handleConnectionSuspended() {
- log.info("Suspending cache until ZK connection is re-established");
-
- setStatus(Status.SUSPENDED);
- }
-
- private void handleConnectionLoss() {
- log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
-
- setStatus(Status.DISCONNECTED);
-
- closeAllCachedConsumers();
- fConsumers.clear();
- }
-
- private void closeAllCachedConsumers() {
- for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
- entry.getValue().close();
- }
- }
-
- private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
- return topic + "::" + consumerGroupId + "::" + clientId;
- }
-
- /**
- * This method is to get a lock
- *
- * @param topic
- * @param consumerGroupId
- * @param consumerId
- * @throws KafkaConsumerCacheException
- */
- public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
- throws KafkaConsumerCacheException {
- // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
- final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
-
- try {
- final String consumerPath = fBaseZkPath + "/" + consumerKey;
-
- log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
-
- final CuratorFramework curator = ConfigurationReader.getCurator();
-
- try {
- curator.setData().forPath(consumerPath, fApiId.getBytes());
- } catch (KeeperException.NoNodeException e) {
- log.error(e.toString());
- curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
- }
-
- log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
- } catch (Exception e) {
- log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
- throw new KafkaConsumerCacheException(e);
- }
-
- log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
-
- try {
- int kSetting_ConsumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
- String strkSetting_ConsumerHandoverWaitMs= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ConsumerHandoverWaitMs+"");
- if(strkSetting_ConsumerHandoverWaitMs!=null) {
- kSetting_ConsumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
- }
- Thread.sleep(kSetting_ConsumerHandoverWaitMs);
- //Thread.sleep(fSettings.getInt(kSetting_ConsumerHandoverWaitMs, kDefault_ConsumerHandoverWaitMs));
- } catch (InterruptedException e) {
- log.error(e.toString());
- Thread.currentThread().interrupt();
- }
- }
-
- private void sweep() {
- final LinkedList<String> removals = new LinkedList<String>();
- long mustTouchEveryMs = kDefault_MustTouchEveryMs;
- String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_TouchEveryMs);
- //if(null!=strkSetting_TouchEveryMs) strkSetting_TouchEveryMs = kDefault_MustTouchEveryMs+"";
- if(null!=strkSetting_TouchEveryMs)
- {
- mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
- }
-
- //final long mustTouchEveryMs = fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
- final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
-
- for (Entry<String, KafkaConsumer> e : fConsumers.entrySet()) {
- final long lastTouchMs = e.getValue().getLastTouch();
-
- log.debug("consumer " + e.getKey() + " last touched at " + lastTouchMs);
-
- if (lastTouchMs < oldestAllowedTouchMs) {
- log.info("consumer " + e.getKey() + " has expired");
- removals.add(e.getKey());
- }
- }
-
- for (String key : removals) {
- dropTimedOutConsumer(key);
- }
- }
-
- /**
- * Creating a thread to run the sweep method
- *
- * @author author
- *
- */
- private class sweeper implements Runnable {
- /**
- * run method
- */
- public void run() {
- sweep();
- }
- }
-
- /**
- * This method is to drop consumer
- *
- * @param topic
- * @param consumerGroup
- * @param clientId
- */
- public void dropConsumer(String topic, String consumerGroup, String clientId) {
- dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
- }
-
- private Status getStatus() {
- return this.status;
- }
-
- private void setStatus(Status status) {
- this.status = status;
- }
-
- private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- //private static final Logger log = LoggerFactory.getLogger(KafkaConsumerCache.class);
-} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java
deleted file mode 100644
index 42a6bb9..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaPublisher.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.kafka;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.json.JSONException;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.backends.Publisher;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-
-/**
- * Sends raw JSON objects into Kafka.
- *
- * Could improve space: BSON rather than JSON?
- *
- * @author author
- *
- */
-
-public class KafkaPublisher implements Publisher {
- /**
- * constructor initializing
- *
- * @param settings
- * @throws rrNvReadable.missingReqdSetting
- */
- public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting {
- //fSettings = settings;
-
- final Properties props = new Properties();
- /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092");
- transferSetting(fSettings, props, "request.required.acks", "1");
- transferSetting(fSettings, props, "message.send.max.retries", "5");
- transferSetting(fSettings, props, "retry.backoff.ms", "150"); */
- String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
- System.out.println("kafkaConnUrl:- "+kafkaConnUrl);
- if(null==kafkaConnUrl){
-
- kafkaConnUrl="localhost:9092";
- }
- transferSetting( props, "metadata.broker.list", kafkaConnUrl);
- transferSetting( props, "request.required.acks", "1");
- transferSetting( props, "message.send.max.retries", "5");
- transferSetting(props, "retry.backoff.ms", "150");
-
- props.put("serializer.class", "kafka.serializer.StringEncoder");
-
- fConfig = new ProducerConfig(props);
- fProducer = new Producer<String, String>(fConfig);
- }
-
- /**
- * Send a message with a given topic and key.
- *
- * @param msg
- * @throws FailedToSendMessageException
- * @throws JSONException
- */
- @Override
- public void sendMessage(String topic, message msg) throws IOException, FailedToSendMessageException {
- final List<message> msgs = new LinkedList<message>();
- msgs.add(msg);
- sendMessages(topic, msgs);
- }
-
- /**
- * method publishing batch messages
- *
- * @param topic
- * @param kms
- * throws IOException
- */
- public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- try {
- fProducer.send(kms);
-
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
- }
-
- }
-
- /**
- * Send a set of messages. Each must have a "key" string value.
- *
- * @param topic
- * @param msg
- * @throws FailedToSendMessageException
- * @throws JSONException
- */
- @Override
- public void sendMessages(String topic, List<? extends message> msgs)
- throws IOException, FailedToSendMessageException {
- log.info("sending " + msgs.size() + " events to [" + topic + "]");
-
- final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
- for (message o : msgs) {
- final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
- kms.add(data);
- }
- try {
- fProducer.send(kms);
-
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
- }
- }
-
- //private final rrNvReadable fSettings;
-
- private ProducerConfig fConfig;
- private Producer<String, String> fProducer;
-
- /**
- * It sets the key value pair
- * @param topic
- * @param msg
- * @param key
- * @param defVal
- */
- private void transferSetting(Properties props, String key, String defVal) {
- String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
- if (null==kafka_prop) kafka_prop=defVal;
- //props.put(key, settings.getString("kafka." + key, defVal));
- props.put(key, kafka_prop);
- }
-
- //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
-
- private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java
deleted file mode 100644
index f0982a9..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-import com.att.nsa.cambria.backends.Consumer;
-import com.att.nsa.cambria.backends.ConsumerFactory;
-/**
- *
- * @author author
- *
- */
-public class MemoryConsumerFactory implements ConsumerFactory
-{
- /**
- *
- * Initializing constructor
- * @param q
- */
- public MemoryConsumerFactory ( MemoryQueue q )
- {
- fQueue = q;
- }
-
- /**
- *
- * @param topic
- * @param consumerGroupId
- * @param clientId
- * @param timeoutMs
- * @return Consumer
- */
- @Override
- public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs )
- {
- return new MemoryConsumer ( topic, consumerGroupId );
- }
-
- private final MemoryQueue fQueue;
-
- /**
- *
- * Define nested inner class
- *
- */
- private class MemoryConsumer implements Consumer
- {
- /**
- *
- * Initializing MemoryConsumer constructor
- * @param topic
- * @param consumer
- *
- */
- public MemoryConsumer ( String topic, String consumer )
- {
- fTopic = topic;
- fConsumer = consumer;
- fCreateMs = System.currentTimeMillis ();
- fLastAccessMs = fCreateMs;
- }
-
- @Override
- /**
- *
- * return consumer details
- */
- public Message nextMessage ()
- {
- return fQueue.get ( fTopic, fConsumer );
- }
-
- private final String fTopic;
- private final String fConsumer;
- private final long fCreateMs;
- private long fLastAccessMs;
-
- @Override
- public void close() {
- //Nothing to close/clean up.
- }
- /**
- *
- */
- public void commitOffsets()
- {
- // ignoring this aspect
- }
- /**
- * get offset
- */
- public long getOffset()
- {
- return 0;
- }
-
- @Override
- /**
- * get consumer topic name
- */
- public String getName ()
- {
- return fTopic + "/" + fConsumer;
- }
-
- @Override
- public long getCreateTimeMs ()
- {
- return fCreateMs;
- }
-
- @Override
- public long getLastAccessMs ()
- {
- return fLastAccessMs;
- }
- }
-
- @Override
- public void destroyConsumer(String topic, String consumerGroupId,
- String clientId) {
- //No cache for memory consumers, so NOOP
- }
-
- @Override
- public void dropCache ()
- {
- // nothing to do - there's no cache here
- }
-
- @Override
- /**
- * @return ArrayList<MemoryConsumer>
- */
- public Collection<? extends Consumer> getConsumers ()
- {
- return new ArrayList<MemoryConsumer> ();
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java
deleted file mode 100644
index 87e59c2..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import com.att.nsa.cambria.metabroker.Broker;
-import com.att.nsa.cambria.metabroker.Topic;
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.security.NsaAcl;
-import com.att.nsa.security.NsaApiKey;
-
-/**
- *
- * @author author
- *
- */
-public class MemoryMetaBroker implements Broker {
- /**
- *
- * @param mq
- * @param configDb
- * @param settings
- */
- public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
- //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
- fQueue = mq;
- fTopics = new HashMap<String, MemTopic>();
- }
-
- @Override
- public List<Topic> getAllTopics() {
- return new LinkedList<Topic>(fTopics.values());
- }
-
- @Override
- public Topic getTopic(String topic) {
- return fTopics.get(topic);
- }
-
- @Override
- public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException {
- if (getTopic(topic) != null) {
- throw new TopicExistsException(topic);
- }
- fQueue.createTopic(topic);
- fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled));
- return getTopic(topic);
- }
-
- @Override
- public void deleteTopic(String topic) {
- fTopics.remove(topic);
- fQueue.removeTopic(topic);
- }
-
- private final MemoryQueue fQueue;
- private final HashMap<String, MemTopic> fTopics;
-
- private static class MemTopic implements Topic {
- /**
- * constructor initialization
- *
- * @param name
- * @param desc
- * @param owner
- * @param transactionEnabled
- */
- public MemTopic(String name, String desc, String owner, boolean transactionEnabled) {
- fName = name;
- fDesc = desc;
- fOwner = owner;
- ftransactionEnabled = transactionEnabled;
- fReaders = null;
- fWriters = null;
- }
-
- @Override
- public String getOwner() {
- return fOwner;
- }
-
- @Override
- public NsaAcl getReaderAcl() {
- return fReaders;
- }
-
- @Override
- public NsaAcl getWriterAcl() {
- return fWriters;
- }
-
- @Override
- public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
- if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) {
- throw new AccessDeniedException(user == null ? "" : user.getKey());
- }
- }
-
- @Override
- public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
- if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) {
- throw new AccessDeniedException(user == null ? "" : user.getKey());
- }
- }
-
- @Override
- public String getName() {
- return fName;
- }
-
- @Override
- public String getDescription() {
- return fDesc;
- }
-
- @Override
- public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
- if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
- }
- if (fWriters == null) {
- fWriters = new NsaAcl();
- }
- fWriters.add(publisherId);
- }
-
- @Override
- public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
- if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
- }
- fWriters.remove(publisherId);
- }
-
- @Override
- public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
- if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
- }
- if (fReaders == null) {
- fReaders = new NsaAcl();
- }
- fReaders.add(consumerId);
- }
-
- @Override
- public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
- if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
- }
- fReaders.remove(consumerId);
- }
-
- private final String fName;
- private final String fDesc;
- private final String fOwner;
- private NsaAcl fReaders;
- private NsaAcl fWriters;
- private boolean ftransactionEnabled;
-
- @Override
- public boolean isTransactionEnabled() {
- return ftransactionEnabled;
- }
-
- @Override
- public Set<String> getOwners() {
- final TreeSet<String> set = new TreeSet<String> ();
- set.add ( fOwner );
- return set;
- }
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java
deleted file mode 100644
index a0dc8b8..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import com.att.nsa.cambria.backends.Consumer;
-import com.att.nsa.cambria.backends.Publisher.message;
-
-/**
- * When broker type is memory, then this class is doing all the topic related
- * operations
- *
- * @author author
- *
- */
-public class MemoryQueue {
- // map from topic to list of msgs
- private HashMap<String, LogBuffer> fQueue;
- private HashMap<String, HashMap<String, Integer>> fOffsets;
-
- /**
- * constructor storing hashMap objects in Queue and Offsets object
- */
- public MemoryQueue() {
- fQueue = new HashMap<String, LogBuffer>();
- fOffsets = new HashMap<String, HashMap<String, Integer>>();
- }
-
- /**
- * method used to create topic
- *
- * @param topic
- */
- public synchronized void createTopic(String topic) {
- LogBuffer q = fQueue.get(topic);
- if (q == null) {
- q = new LogBuffer(1024 * 1024);
- fQueue.put(topic, q);
- }
- }
-
- /**
- * method used to remove topic
- *
- * @param topic
- */
- public synchronized void removeTopic(String topic) {
- LogBuffer q = fQueue.get(topic);
- if (q != null) {
- fQueue.remove(topic);
- }
- }
-
- /**
- * method to write message on topic
- *
- * @param topic
- * @param m
- */
- public synchronized void put(String topic, message m) {
- LogBuffer q = fQueue.get(topic);
- if (q == null) {
- createTopic(topic);
- q = fQueue.get(topic);
- }
- q.push(m.getMessage());
- }
-
- /**
- * method to read consumer messages
- *
- * @param topic
- * @param consumerName
- * @return
- */
- public synchronized Consumer.Message get(String topic, String consumerName) {
- final LogBuffer q = fQueue.get(topic);
- if (q == null) {
- return null;
- }
-
- HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
- if (offsetMap == null) {
- offsetMap = new HashMap<String, Integer>();
- fOffsets.put(consumerName, offsetMap);
- }
- Integer offset = offsetMap.get(topic);
- if (offset == null) {
- offset = 0;
- }
-
- final msgInfo result = q.read(offset);
- if (result != null && result.msg != null) {
- offsetMap.put(topic, result.offset + 1);
- }
- return result;
- }
-
- /**
- * static inner class used to details about consumed messages
- *
- * @author author
- *
- */
- private static class msgInfo implements Consumer.Message {
- /**
- * published message which is consumed
- */
- public String msg;
- /**
- * offset associated with message
- */
- public int offset;
-
- /**
- * get offset of messages
- */
- @Override
- public long getOffset() {
- return offset;
- }
-
- /**
- * get consumed message
- */
- @Override
- public String getMessage() {
- return msg;
- }
- }
-
- /**
- *
- * @author author
- *
- * private LogBuffer class has synchronized push and read method
- */
- private class LogBuffer {
- private int fBaseOffset;
- private final int fMaxSize;
- private final ArrayList<String> fList;
-
- /**
- * constructor initializing the offset, maxsize and list
- *
- * @param maxSize
- */
- public LogBuffer(int maxSize) {
- fBaseOffset = 0;
- fMaxSize = maxSize;
- fList = new ArrayList<String>();
- }
-
- /**
- * pushing message
- *
- * @param msg
- */
- public synchronized void push(String msg) {
- fList.add(msg);
- while (fList.size() > fMaxSize) {
- fList.remove(0);
- fBaseOffset++;
- }
- }
-
- /**
- * reading messages
- *
- * @param offset
- * @return
- */
- public synchronized msgInfo read(int offset) {
- final int actual = Math.max(0, offset - fBaseOffset);
-
- final msgInfo mi = new msgInfo();
- mi.msg = (actual >= fList.size()) ? null : fList.get(actual);
- if (mi.msg == null)
- return null;
-
- mi.offset = actual + fBaseOffset;
- return mi;
- }
-
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java
deleted file mode 100644
index d653f6e..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.att.nsa.cambria.backends.Publisher;
-import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
-
-import kafka.producer.KeyedMessage;
-
-/**
- *
- * @author author
- *
- */
-public class MemoryQueuePublisher implements Publisher {
- /**
- *
- * @param q
- * @param b
- */
- public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) {
- fBroker = b;
- fQueue = q;
- }
-
- /**
- * sendBatchMessages
- *
- * @param topic
- * @param kms
- */
- public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- }
-
- /**
- *
- * @param topic
- * @param msg
- * @throws IOException
- */
- @Override
- public void sendMessage(String topic, message msg) throws IOException {
- if (null == fBroker.getTopic(topic)) {
- try {
- fBroker.createTopic(topic, topic, null, 8, 3, false);
- } catch (TopicExistsException e) {
- throw new RuntimeException(e);
- }
- }
- fQueue.put(topic, msg);
- }
-
- @Override
- /**
- * @param topic
- * @param msgs
- * @throws IOException
- */
- public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
- for (message m : msgs) {
- sendMessage(topic, m);
- }
- }
-
- private final MemoryMetaBroker fBroker;
- private final MemoryQueue fQueue;
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java
deleted file mode 100644
index c49ac4f..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.att.nsa.cambria.backends.Publisher;
-
-import kafka.producer.KeyedMessage;
-
-/**
- * class is used to message publishing
- *
- * @author author
- *
- */
-public class MessageDropper implements Publisher {
- /**
- * publish single messages
- * param topic
- * param msg
- */
- @Override
- public void sendMessage(String topic, message msg) throws IOException {
- }
-
- /**
- * publish multiple messages
- */
- @Override
- public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
- }
-
- /**
- * publish batch messages
- */
- @Override
- public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java
deleted file mode 100644
index 9ff8bd6..0000000
--- a/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.backends.memory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.att.nsa.cambria.backends.Publisher;
-
-import kafka.producer.KeyedMessage;
-
-/**
- * class used for logging perspective
- *
- * @author author
- *
- */
-public class MessageLogger implements Publisher {
- public MessageLogger() {
- }
-
- public void setFile(File f) throws FileNotFoundException {
- fStream = new FileOutputStream(f, true);
- }
-
- /**
- *
- * @param topic
- * @param msg
- * @throws IOException
- */
- @Override
- public void sendMessage(String topic, message msg) throws IOException {
- logMsg(msg);
- }
-
- /**
- * @param topic
- * @param msgs
- * @throws IOException
- */
- @Override
- public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
- for (message m : msgs) {
- logMsg(m);
- }
- }
-
- /**
- * @param topic
- * @param kms
- * @throws IOException
- */
- @Override
- public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws
-
- IOException {
- }
-
- private FileOutputStream fStream;
-
- /**
- *
- * @param msg
- * @throws IOException
- */
- private void logMsg(message msg) throws IOException {
- String key = msg.getKey();
- if (key == null)
- key = "<none>";
-
- fStream.write('[');
- fStream.write(key.getBytes());
- fStream.write("] ".getBytes());
- fStream.write(msg.getMessage().getBytes());
- fStream.write('\n');
- }
-}