diff options
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/backends/memory')
6 files changed, 818 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java new file mode 100644 index 0000000..f0982a9 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryConsumerFactory.java @@ -0,0 +1,160 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.util.ArrayList; +import java.util.Collection; + +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +/** + * + * @author author + * + */ +public class MemoryConsumerFactory implements ConsumerFactory +{ + /** + * + * Initializing constructor + * @param q + */ + public MemoryConsumerFactory ( MemoryQueue q ) + { + fQueue = q; + } + + /** + * + * @param topic + * @param consumerGroupId + * @param clientId + * @param timeoutMs + * @return Consumer + */ + @Override + public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs ) + { + return new MemoryConsumer ( topic, consumerGroupId ); + } + + private final MemoryQueue fQueue; + + /** + * + * Define nested inner class + * + */ + private class MemoryConsumer implements Consumer + { + /** + * + * Initializing MemoryConsumer constructor + * @param topic + * @param consumer + * + */ + public MemoryConsumer ( String topic, String consumer ) + { + fTopic = topic; + fConsumer = consumer; + fCreateMs = System.currentTimeMillis (); + fLastAccessMs = fCreateMs; + } + + @Override + /** + * + * return consumer details + */ + public Message nextMessage () + { + return fQueue.get ( fTopic, fConsumer ); + } + + private final String fTopic; + private final String fConsumer; + private final long fCreateMs; + private long fLastAccessMs; + + @Override + public void close() { + //Nothing to close/clean up. + } + /** + * + */ + public void commitOffsets() + { + // ignoring this aspect + } + /** + * get offset + */ + public long getOffset() + { + return 0; + } + + @Override + /** + * get consumer topic name + */ + public String getName () + { + return fTopic + "/" + fConsumer; + } + + @Override + public long getCreateTimeMs () + { + return fCreateMs; + } + + @Override + public long getLastAccessMs () + { + return fLastAccessMs; + } + } + + @Override + public void destroyConsumer(String topic, String consumerGroupId, + String clientId) { + //No cache for memory consumers, so NOOP + } + + @Override + public void dropCache () + { + // nothing to do - there's no cache here + } + + @Override + /** + * @return ArrayList<MemoryConsumer> + */ + public Collection<? extends Consumer> getConsumers () + { + return new ArrayList<MemoryConsumer> (); + } +} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java new file mode 100644 index 0000000..87e59c2 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryMetaBroker.java @@ -0,0 +1,199 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import com.att.nsa.cambria.metabroker.Broker; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaApiKey; + +/** + * + * @author author + * + */ +public class MemoryMetaBroker implements Broker { + /** + * + * @param mq + * @param configDb + * @param settings + */ + public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { + //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { + fQueue = mq; + fTopics = new HashMap<String, MemTopic>(); + } + + @Override + public List<Topic> getAllTopics() { + return new LinkedList<Topic>(fTopics.values()); + } + + @Override + public Topic getTopic(String topic) { + return fTopics.get(topic); + } + + @Override + public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas, + boolean transactionEnabled) throws TopicExistsException { + if (getTopic(topic) != null) { + throw new TopicExistsException(topic); + } + fQueue.createTopic(topic); + fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled)); + return getTopic(topic); + } + + @Override + public void deleteTopic(String topic) { + fTopics.remove(topic); + fQueue.removeTopic(topic); + } + + private final MemoryQueue fQueue; + private final HashMap<String, MemTopic> fTopics; + + private static class MemTopic implements Topic { + /** + * constructor initialization + * + * @param name + * @param desc + * @param owner + * @param transactionEnabled + */ + public MemTopic(String name, String desc, String owner, boolean transactionEnabled) { + fName = name; + fDesc = desc; + fOwner = owner; + ftransactionEnabled = transactionEnabled; + fReaders = null; + fWriters = null; + } + + @Override + public String getOwner() { + return fOwner; + } + + @Override + public NsaAcl getReaderAcl() { + return fReaders; + } + + @Override + public NsaAcl getWriterAcl() { + return fWriters; + } + + @Override + public void checkUserRead(NsaApiKey user) throws AccessDeniedException { + if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) { + throw new AccessDeniedException(user == null ? "" : user.getKey()); + } + } + + @Override + public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { + if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) { + throw new AccessDeniedException(user == null ? "" : user.getKey()); + } + } + + @Override + public String getName() { + return fName; + } + + @Override + public String getDescription() { + return fDesc; + } + + @Override + public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + if (fWriters == null) { + fWriters = new NsaAcl(); + } + fWriters.add(publisherId); + } + + @Override + public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + fWriters.remove(publisherId); + } + + @Override + public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + if (fReaders == null) { + fReaders = new NsaAcl(); + } + fReaders.add(consumerId); + } + + @Override + public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + fReaders.remove(consumerId); + } + + private final String fName; + private final String fDesc; + private final String fOwner; + private NsaAcl fReaders; + private NsaAcl fWriters; + private boolean ftransactionEnabled; + + @Override + public boolean isTransactionEnabled() { + return ftransactionEnabled; + } + + @Override + public Set<String> getOwners() { + final TreeSet<String> set = new TreeSet<String> (); + set.add ( fOwner ); + return set; + } + } +} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java new file mode 100644 index 0000000..a0dc8b8 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueue.java @@ -0,0 +1,207 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.Publisher.message; + +/** + * When broker type is memory, then this class is doing all the topic related + * operations + * + * @author author + * + */ +public class MemoryQueue { + // map from topic to list of msgs + private HashMap<String, LogBuffer> fQueue; + private HashMap<String, HashMap<String, Integer>> fOffsets; + + /** + * constructor storing hashMap objects in Queue and Offsets object + */ + public MemoryQueue() { + fQueue = new HashMap<String, LogBuffer>(); + fOffsets = new HashMap<String, HashMap<String, Integer>>(); + } + + /** + * method used to create topic + * + * @param topic + */ + public synchronized void createTopic(String topic) { + LogBuffer q = fQueue.get(topic); + if (q == null) { + q = new LogBuffer(1024 * 1024); + fQueue.put(topic, q); + } + } + + /** + * method used to remove topic + * + * @param topic + */ + public synchronized void removeTopic(String topic) { + LogBuffer q = fQueue.get(topic); + if (q != null) { + fQueue.remove(topic); + } + } + + /** + * method to write message on topic + * + * @param topic + * @param m + */ + public synchronized void put(String topic, message m) { + LogBuffer q = fQueue.get(topic); + if (q == null) { + createTopic(topic); + q = fQueue.get(topic); + } + q.push(m.getMessage()); + } + + /** + * method to read consumer messages + * + * @param topic + * @param consumerName + * @return + */ + public synchronized Consumer.Message get(String topic, String consumerName) { + final LogBuffer q = fQueue.get(topic); + if (q == null) { + return null; + } + + HashMap<String, Integer> offsetMap = fOffsets.get(consumerName); + if (offsetMap == null) { + offsetMap = new HashMap<String, Integer>(); + fOffsets.put(consumerName, offsetMap); + } + Integer offset = offsetMap.get(topic); + if (offset == null) { + offset = 0; + } + + final msgInfo result = q.read(offset); + if (result != null && result.msg != null) { + offsetMap.put(topic, result.offset + 1); + } + return result; + } + + /** + * static inner class used to details about consumed messages + * + * @author author + * + */ + private static class msgInfo implements Consumer.Message { + /** + * published message which is consumed + */ + public String msg; + /** + * offset associated with message + */ + public int offset; + + /** + * get offset of messages + */ + @Override + public long getOffset() { + return offset; + } + + /** + * get consumed message + */ + @Override + public String getMessage() { + return msg; + } + } + + /** + * + * @author author + * + * private LogBuffer class has synchronized push and read method + */ + private class LogBuffer { + private int fBaseOffset; + private final int fMaxSize; + private final ArrayList<String> fList; + + /** + * constructor initializing the offset, maxsize and list + * + * @param maxSize + */ + public LogBuffer(int maxSize) { + fBaseOffset = 0; + fMaxSize = maxSize; + fList = new ArrayList<String>(); + } + + /** + * pushing message + * + * @param msg + */ + public synchronized void push(String msg) { + fList.add(msg); + while (fList.size() > fMaxSize) { + fList.remove(0); + fBaseOffset++; + } + } + + /** + * reading messages + * + * @param offset + * @return + */ + public synchronized msgInfo read(int offset) { + final int actual = Math.max(0, offset - fBaseOffset); + + final msgInfo mi = new msgInfo(); + mi.msg = (actual >= fList.size()) ? null : fList.get(actual); + if (mi.msg == null) + return null; + + mi.offset = actual + fBaseOffset; + return mi; + } + + } +} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java new file mode 100644 index 0000000..d653f6e --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MemoryQueuePublisher.java @@ -0,0 +1,90 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.att.nsa.cambria.backends.Publisher; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; + +import kafka.producer.KeyedMessage; + +/** + * + * @author author + * + */ +public class MemoryQueuePublisher implements Publisher { + /** + * + * @param q + * @param b + */ + public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) { + fBroker = b; + fQueue = q; + } + + /** + * sendBatchMessages + * + * @param topic + * @param kms + */ + public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { + } + + /** + * + * @param topic + * @param msg + * @throws IOException + */ + @Override + public void sendMessage(String topic, message msg) throws IOException { + if (null == fBroker.getTopic(topic)) { + try { + fBroker.createTopic(topic, topic, null, 8, 3, false); + } catch (TopicExistsException e) { + throw new RuntimeException(e); + } + } + fQueue.put(topic, msg); + } + + @Override + /** + * @param topic + * @param msgs + * @throws IOException + */ + public void sendMessages(String topic, List<? extends message> msgs) throws IOException { + for (message m : msgs) { + sendMessage(topic, m); + } + } + + private final MemoryMetaBroker fBroker; + private final MemoryQueue fQueue; +} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java new file mode 100644 index 0000000..c49ac4f --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MessageDropper.java @@ -0,0 +1,61 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.att.nsa.cambria.backends.Publisher; + +import kafka.producer.KeyedMessage; + +/** + * class is used to message publishing + * + * @author author + * + */ +public class MessageDropper implements Publisher { + /** + * publish single messages + * param topic + * param msg + */ + @Override + public void sendMessage(String topic, message msg) throws IOException { + } + + /** + * publish multiple messages + */ + @Override + public void sendMessages(String topic, List<? extends message> msgs) throws IOException { + } + + /** + * publish batch messages + */ + @Override + public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { + } +} diff --git a/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java b/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java new file mode 100644 index 0000000..9ff8bd6 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/backends/memory/MessageLogger.java @@ -0,0 +1,101 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.backends.memory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.att.nsa.cambria.backends.Publisher; + +import kafka.producer.KeyedMessage; + +/** + * class used for logging perspective + * + * @author author + * + */ +public class MessageLogger implements Publisher { + public MessageLogger() { + } + + public void setFile(File f) throws FileNotFoundException { + fStream = new FileOutputStream(f, true); + } + + /** + * + * @param topic + * @param msg + * @throws IOException + */ + @Override + public void sendMessage(String topic, message msg) throws IOException { + logMsg(msg); + } + + /** + * @param topic + * @param msgs + * @throws IOException + */ + @Override + public void sendMessages(String topic, List<? extends message> msgs) throws IOException { + for (message m : msgs) { + logMsg(m); + } + } + + /** + * @param topic + * @param kms + * @throws IOException + */ + @Override + public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws + + IOException { + } + + private FileOutputStream fStream; + + /** + * + * @param msg + * @throws IOException + */ + private void logMsg(message msg) throws IOException { + String key = msg.getKey(); + if (key == null) + key = "<none>"; + + fStream.write('['); + fStream.write(key.getBytes()); + fStream.write("] ".getBytes()); + fStream.write(msg.getMessage().getBytes()); + fStream.write('\n'); + } +} |