summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends/memory
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/memory')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java182
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java198
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java207
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java92
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java109
5 files changed, 788 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
new file mode 100644
index 0000000..0c34bfd
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
@@ -0,0 +1,182 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.ConsumerFactory;
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryConsumerFactory implements ConsumerFactory
+{
+ /**
+ *
+ * Initializing constructor
+ * @param q
+ */
+ public MemoryConsumerFactory ( MemoryQueue q )
+ {
+ fQueue = q;
+ }
+
+ /**
+ *
+ * @param topic
+ * @param consumerGroupId
+ * @param clientId
+ * @param timeoutMs
+ * @return Consumer
+ */
+ @Override
+ public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost )
+ {
+ return new MemoryConsumer ( topic, consumerGroupId );
+ }
+
+ private final MemoryQueue fQueue;
+
+ /**
+ *
+ * Define nested inner class
+ *
+ */
+ private class MemoryConsumer implements Consumer
+ {
+ /**
+ *
+ * Initializing MemoryConsumer constructor
+ * @param topic
+ * @param consumer
+ *
+ */
+ public MemoryConsumer ( String topic, String consumer )
+ {
+ fTopic = topic;
+ fConsumer = consumer;
+ fCreateMs = System.currentTimeMillis ();
+ fLastAccessMs = fCreateMs;
+ }
+
+ @Override
+ /**
+ *
+ * return consumer details
+ */
+ public Message nextMessage ()
+ {
+ return fQueue.get ( fTopic, fConsumer );
+ }
+
+ private final String fTopic;
+ private final String fConsumer;
+ private final long fCreateMs;
+ private long fLastAccessMs;
+
+ @Override
+ public boolean close() {
+ //Nothing to close/clean up.
+ return true;
+ }
+ /**
+ *
+ */
+ public void commitOffsets()
+ {
+ // ignoring this aspect
+ }
+ /**
+ * get offset
+ */
+ public long getOffset()
+ {
+ return 0;
+ }
+
+ @Override
+ /**
+ * get consumer topic name
+ */
+ public String getName ()
+ {
+ return fTopic + "/" + fConsumer;
+ }
+
+ @Override
+ public long getCreateTimeMs ()
+ {
+ return fCreateMs;
+ }
+
+ @Override
+ public long getLastAccessMs ()
+ {
+ return fLastAccessMs;
+ }
+
+
+
+ @Override
+ public void setOffset(long offset) {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ }
+
+ @Override
+ public void destroyConsumer(String topic, String consumerGroupId,
+ String clientId) {
+ //No cache for memory consumers, so NOOP
+ }
+
+ @Override
+ public void dropCache ()
+ {
+ // nothing to do - there's no cache here
+ }
+
+ @Override
+ /**
+ * @return ArrayList<MemoryConsumer>
+ */
+ public Collection<? extends Consumer> getConsumers ()
+ {
+ return new ArrayList<MemoryConsumer> ();
+ }
+
+ @Override
+ public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
+ String remotehost) throws UnavailableException, CambriaApiException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
new file mode 100644
index 0000000..22f0588
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -0,0 +1,198 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.att.dmf.mr.metabroker.Broker;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.NsaApiKey;
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryMetaBroker implements Broker {
+ /**
+ *
+ * @param mq
+ * @param configDb
+ * @param settings
+ */
+ public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
+ //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+ fQueue = mq;
+ fTopics = new HashMap<String, MemTopic>();
+ }
+
+ @Override
+ public List<Topic> getAllTopics() {
+ return new LinkedList<Topic>(fTopics.values());
+ }
+
+ @Override
+ public Topic getTopic(String topic) {
+ return fTopics.get(topic);
+ }
+
+ @Override
+ public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas,
+ boolean transactionEnabled) throws TopicExistsException {
+ if (getTopic(topic) != null) {
+ throw new TopicExistsException(topic);
+ }
+ fQueue.createTopic(topic);
+ fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled));
+ return getTopic(topic);
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ fTopics.remove(topic);
+ fQueue.removeTopic(topic);
+ }
+
+ private final MemoryQueue fQueue;
+ private final HashMap<String, MemTopic> fTopics;
+
+ private static class MemTopic implements Topic {
+ /**
+ * constructor initialization
+ *
+ * @param name
+ * @param desc
+ * @param owner
+ * @param transactionEnabled
+ */
+ public MemTopic(String name, String desc, String owner, boolean transactionEnabled) {
+ fName = name;
+ fDesc = desc;
+ fOwner = owner;
+ ftransactionEnabled = transactionEnabled;
+ fReaders = null;
+ fWriters = null;
+ }
+
+ @Override
+ public String getOwner() {
+ return fOwner;
+ }
+
+ @Override
+ public NsaAcl getReaderAcl() {
+ return fReaders;
+ }
+
+ @Override
+ public NsaAcl getWriterAcl() {
+ return fWriters;
+ }
+
+ @Override
+ public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
+ if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) {
+ throw new AccessDeniedException(user == null ? "" : user.getKey());
+ }
+ }
+
+ @Override
+ public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
+ if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) {
+ throw new AccessDeniedException(user == null ? "" : user.getKey());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return fName;
+ }
+
+ @Override
+ public String getDescription() {
+ return fDesc;
+ }
+
+ @Override
+ public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ if (fWriters == null) {
+ fWriters = new NsaAcl();
+ }
+ fWriters.add(publisherId);
+ }
+
+ @Override
+ public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ fWriters.remove(publisherId);
+ }
+
+ @Override
+ public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ if (fReaders == null) {
+ fReaders = new NsaAcl();
+ }
+ fReaders.add(consumerId);
+ }
+
+ @Override
+ public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
+ if (!fOwner.equals(asUser.getKey())) {
+ throw new AccessDeniedException("User does not own this topic " + fName);
+ }
+ fReaders.remove(consumerId);
+ }
+
+ private final String fName;
+ private final String fDesc;
+ private final String fOwner;
+ private NsaAcl fReaders;
+ private NsaAcl fWriters;
+ private boolean ftransactionEnabled;
+
+ @Override
+ public boolean isTransactionEnabled() {
+ return ftransactionEnabled;
+ }
+
+ @Override
+ public Set<String> getOwners() {
+ final TreeSet<String> set = new TreeSet<String> ();
+ set.add ( fOwner );
+ return set;
+ }
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
new file mode 100644
index 0000000..0629972
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
@@ -0,0 +1,207 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.backends.Publisher.message;
+
+/**
+ * When broker type is memory, then this class is doing all the topic related
+ * operations
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryQueue {
+ // map from topic to list of msgs
+ private HashMap<String, LogBuffer> fQueue;
+ private HashMap<String, HashMap<String, Integer>> fOffsets;
+
+ /**
+ * constructor storing hashMap objects in Queue and Offsets object
+ */
+ public MemoryQueue() {
+ fQueue = new HashMap<String, LogBuffer>();
+ fOffsets = new HashMap<String, HashMap<String, Integer>>();
+ }
+
+ /**
+ * method used to create topic
+ *
+ * @param topic
+ */
+ public synchronized void createTopic(String topic) {
+ LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ q = new LogBuffer(1024 * 1024);
+ fQueue.put(topic, q);
+ }
+ }
+
+ /**
+ * method used to remove topic
+ *
+ * @param topic
+ */
+ public synchronized void removeTopic(String topic) {
+ LogBuffer q = fQueue.get(topic);
+ if (q != null) {
+ fQueue.remove(topic);
+ }
+ }
+
+ /**
+ * method to write message on topic
+ *
+ * @param topic
+ * @param m
+ */
+ public synchronized void put(String topic, message m) {
+ LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ createTopic(topic);
+ q = fQueue.get(topic);
+ }
+ q.push(m.getMessage());
+ }
+
+ /**
+ * method to read consumer messages
+ *
+ * @param topic
+ * @param consumerName
+ * @return
+ */
+ public synchronized Consumer.Message get(String topic, String consumerName) {
+ final LogBuffer q = fQueue.get(topic);
+ if (q == null) {
+ return null;
+ }
+
+ HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
+ if (offsetMap == null) {
+ offsetMap = new HashMap<String, Integer>();
+ fOffsets.put(consumerName, offsetMap);
+ }
+ Integer offset = offsetMap.get(topic);
+ if (offset == null) {
+ offset = 0;
+ }
+
+ final msgInfo result = q.read(offset);
+ if (result != null && result.msg != null) {
+ offsetMap.put(topic, result.offset + 1);
+ }
+ return result;
+ }
+
+ /**
+ * static inner class used to details about consumed messages
+ *
+ * @author anowarul.islam
+ *
+ */
+ private static class msgInfo implements Consumer.Message {
+ /**
+ * published message which is consumed
+ */
+ public String msg;
+ /**
+ * offset associated with message
+ */
+ public int offset;
+
+ /**
+ * get offset of messages
+ */
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * get consumed message
+ */
+ @Override
+ public String getMessage() {
+ return msg;
+ }
+ }
+
+ /**
+ *
+ * @author sneha.d.desai
+ *
+ * private LogBuffer class has synchronized push and read method
+ */
+ private class LogBuffer {
+ private int fBaseOffset;
+ private final int fMaxSize;
+ private final ArrayList<String> fList;
+
+ /**
+ * constructor initializing the offset, maxsize and list
+ *
+ * @param maxSize
+ */
+ public LogBuffer(int maxSize) {
+ fBaseOffset = 0;
+ fMaxSize = maxSize;
+ fList = new ArrayList<String>();
+ }
+
+ /**
+ * pushing message
+ *
+ * @param msg
+ */
+ public synchronized void push(String msg) {
+ fList.add(msg);
+ while (fList.size() > fMaxSize) {
+ fList.remove(0);
+ fBaseOffset++;
+ }
+ }
+
+ /**
+ * reading messages
+ *
+ * @param offset
+ * @return
+ */
+ public synchronized msgInfo read(int offset) {
+ final int actual = Math.max(0, offset - fBaseOffset);
+
+ final msgInfo mi = new msgInfo();
+ mi.msg = (actual >= fList.size()) ? null : fList.get(actual);
+ if (mi.msg == null)
+ return null;
+
+ mi.offset = actual + fBaseOffset;
+ return mi;
+ }
+
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java
new file mode 100644
index 0000000..2b43ed3
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+
+
+
+/**
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MemoryQueuePublisher implements Publisher {
+ /**
+ *
+ * @param q
+ * @param b
+ */
+ public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) {
+ fBroker = b;
+ fQueue = q;
+ }
+
+
+ /**
+ *
+ * @param topic
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException {
+ if (null == fBroker.getTopic(topic)) {
+ try {
+ fBroker.createTopic(topic, topic, null, 8, 3, false);
+ } catch (TopicExistsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ fQueue.put(topic, msg);
+ }
+
+ @Override
+ /**
+ * @param topic
+ * @param msgs
+ * @throws IOException
+ */
+
+ public void sendBatchMessageNew(String topic, ArrayList<ProducerRecord<String, String>> kms) throws IOException {
+
+ }
+
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+ }
+
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ for (message m : msgs) {
+ sendMessage(topic, m);
+ }
+ }
+
+ private final MemoryMetaBroker fBroker;
+ private final MemoryQueue fQueue;
+}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
new file mode 100644
index 0000000..8e41c9f
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.backends.memory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.att.dmf.mr.backends.Publisher;
+
+//import kafka.producer.KeyedMessage;
+
+/**
+ * class used for logging perspective
+ *
+ * @author anowarul.islam
+ *
+ */
+public class MessageLogger implements Publisher {
+ public MessageLogger() {
+ }
+
+ public void setFile(File f) throws FileNotFoundException {
+ fStream = new FileOutputStream(f, true);
+ }
+
+ /**
+ *
+ * @param topic
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public void sendMessage(String topic, message msg) throws IOException {
+ logMsg(msg);
+ }
+
+ /**
+ * @param topic
+ * @param msgs
+ * @throws IOException
+ */
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+ for (message m : msgs) {
+ logMsg(m);
+ }
+ }
+
+ /**
+ * @param topic
+ * @param kms
+ * @throws IOException
+
+ @Override
+ public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws
+
+ IOException {
+ }
+ */
+ private FileOutputStream fStream;
+
+ /**
+ *
+ * @param msg
+ * @throws IOException
+ */
+ private void logMsg(message msg) throws IOException {
+ String key = msg.getKey();
+ if (key == null)
+ key = "<none>";
+
+ fStream.write('[');
+ fStream.write(key.getBytes());
+ fStream.write("] ".getBytes());
+ fStream.write(msg.getMessage().getBytes());
+ fStream.write('\n');
+ }
+ public void sendBatchMessageNew(String topic, ArrayList<ProducerRecord<String, String>> kms) throws IOException {
+
+ }
+
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+ }
+}