diff options
23 files changed, 555 insertions, 213 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 2a9e0ab..4bdd9f3 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONException; import org.springframework.beans.factory.annotation.Qualifier; - +import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.utils.Utils; @@ -61,15 +61,10 @@ public class KafkaPublisher implements Publisher { * @throws rrNvReadable.missingReqdSetting */ public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting { - //fSettings = settings; final Properties props = new Properties(); - /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092"); - transferSetting(fSettings, props, "request.required.acks", "1"); - transferSetting(fSettings, props, "message.send.max.retries", "5"); - transferSetting(fSettings, props, "retry.backoff.ms", "150"); */ String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); - if(null==kafkaConnUrl){ + if(StringUtils.isEmpty(kafkaConnUrl)){ kafkaConnUrl="localhost:9092"; } @@ -209,7 +204,7 @@ try{ */ private void transferSetting(Properties props, String key, String defVal) { String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (null==kafka_prop) kafka_prop=defVal; + if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal; //props.put(key, settings.getString("kafka." + key, defVal)); props.put(key, kafka_prop); } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java index 2091e5f..f645c8d 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -52,7 +52,6 @@ import com.att.nsa.metrics.impl.CdmRateTicker; @Component public class DMaaPCambriaLimiter { private final HashMap<String, RateInfo> fRateInfo; - private final HashMap<String, RateInfoCheck> fRateInfoCheck; private final double fMaxEmptyPollsPerMinute; private final double fMaxPollsPerMinute; private final int fWindowLengthMins; @@ -70,7 +69,6 @@ public class DMaaPCambriaLimiter { @Autowired public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) { fRateInfo = new HashMap<>(); - fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -105,7 +103,6 @@ public class DMaaPCambriaLimiter { */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { fRateInfo = new HashMap<>(); - fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; @@ -226,42 +223,6 @@ public class DMaaPCambriaLimiter { - private static class RateInfoCheck { - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; - /** - * constructor initialzes - * - * @param label - * @param windowLengthMinutes - */ - public RateInfoCheck(String label, int windowLengthMinutes) { - fLabel = label; - fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, - windowLengthMinutes, TimeUnit.MINUTES); - } - - public String getLabel() { - return fLabel; - } - - /** - * CdmRateTicker is reset - */ - public void reset() { - fCallRateSinceLastMsgSend.reset(); - } - - /** - * - * @return - */ - public double onCall() { - fCallRateSinceLastMsgSend.tick(); - return fCallRateSinceLastMsgSend.getRate(); - } - } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 0a909ff..63e065f 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -221,7 +221,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { try { ipLock.release(); } catch (Exception e) { - throw new UnavailableException("Error while releasing consumer factory lock" + e, e); + log.error("Error while releasing consumer factory lock", e); } } } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 1e20ee2..03a1bd5 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -39,7 +39,7 @@ import org.apache.kafka.common.KafkaFuture; import org.json.JSONObject; import org.json.JSONArray; import org.springframework.beans.factory.annotation.Qualifier; - +import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.metabroker.Broker1; @@ -53,6 +53,7 @@ import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.drumlin.till.data.stringUtils; import com.att.nsa.drumlin.till.nv.rrNvReadable; import com.att.nsa.security.NsaAcl; import com.att.nsa.security.NsaAclUtils; @@ -75,7 +76,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "kafka.metadata.broker.list"); - if (null == fkafkaBrokers) { + if (StringUtils.isEmpty(fkafkaBrokers)) { fkafkaBrokers = "localhost:9092"; } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java index 82ff80a..0ab80c4 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java @@ -435,7 +435,7 @@ public class MMServiceImpl implements MMService { // start processing, building a batch to push to the backend final long startMs = System.currentTimeMillis(); long count = 0; - long maxEventBatch = 1024 * 16; + long maxEventBatch = 1024L * 16L; String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); if (null != evenlen) maxEventBatch = Long.parseLong(evenlen); diff --git a/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java b/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java index a175b16..dbf4246 100644 --- a/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java +++ b/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java @@ -74,8 +74,6 @@ public class ContentLengthFilter implements Filter { */ public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException { - // TODO Auto-generated method stub - // place your code here log.info("inside servlet do filter content length checking before pub/sub"); HttpServletRequest request = (HttpServletRequest) req; JSONObject jsonObj = null; @@ -105,12 +103,14 @@ public class ContentLengthFilter implements Filter { chain.doFilter(req, res); } } catch (CambriaApiException | NumberFormatException e) { - log.error("message size is greater then default"); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED, - DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds() - + jsonObj.toString()); - log.info(errRes.toString()); - + log.error("message size is greater then default", e); + if (jsonObj != null) { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED, + DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), + errorMessages.getMsgSizeExceeds() + + jsonObj.toString()); + log.info(errRes.toString()); + } } } @@ -119,14 +119,13 @@ public class ContentLengthFilter implements Filter { * @see Filter#init(FilterConfig) */ public void init(FilterConfig fConfig) throws ServletException { - // TODO Auto-generated method stub this.filterConfig = fConfig; log.info("Filter Content Length Initialize"); ApplicationContext ctx = WebApplicationContextUtils.getRequiredWebApplicationContext(fConfig .getServletContext()); DefaultLength defLength = (DefaultLength) ctx.getBean("defLength"); - DMaaPErrorMessages errorMessages = (DMaaPErrorMessages) ctx.getBean("DMaaPErrorMessages"); - this.errorMessages = errorMessages; + DMaaPErrorMessages errMessages = (DMaaPErrorMessages) ctx.getBean("DMaaPErrorMessages"); + this.errorMessages = errMessages; this.defaultLength = defLength; } diff --git a/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java b/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java index 598ef1b..3425823 100644 --- a/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java +++ b/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java @@ -24,14 +24,14 @@ package org.onap.dmaap.mr.filter; public class DefaultLength { - String defaultLength; + String defLength; public String getDefaultLength() { - return defaultLength; + return defLength; } public void setDefaultLength(String defaultLength) { - this.defaultLength = defaultLength; + this.defLength = defaultLength; } } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/JUnitTestSuite.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/JUnitTestSuite.java index 5904de5..de77603 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/JUnitTestSuite.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/JUnitTestSuite.java @@ -28,7 +28,7 @@ import org.junit.runners.Suite.SuiteClasses; import org.apache.log4j.Logger; @RunWith(Suite.class) -@SuiteClasses({ KafkaConsumerCacheTest.class, KafkaPublisherTest.class, }) +@SuiteClasses({ KafkaConsumerCacheTest.class, KafkaPublisherTest.class,Kafka011ConsumerTest.class,KafkaLiveLockAvoider2Test.class }) public class JUnitTestSuite { private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class); diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java new file mode 100644 index 0000000..88d703e --- /dev/null +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java @@ -0,0 +1,91 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.mr.cambria.backends.kafka; + +import static org.junit.Assert.assertNotNull; + + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer; +import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ AJSCPropertiesMap.class }) +public class Kafka011ConsumerTest { + + + @Mock + private KafkaConsumer<String, String> cc; + @Mock + private KafkaLiveLockAvoider2 klla; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testKafka011Consumer() { + PowerMockito.mockStatic(AJSCPropertiesMap.class); + PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "consumer.timeout")).thenReturn("10"); + Kafka011Consumer consumer=null; + try { + consumer= new Kafka011Consumer("topic", "group", "id", cc, klla) ; + consumer.commitOffsets(); + consumer.touch(); + consumer.setOffset(10); + } catch (Exception e) { + + } + assertNotNull(consumer); + assertNotNull(consumer.getConsumer()); + assertNotNull(consumer.getConsumerGroup()); + assertNotNull(consumer.getConsumerId()); + assertNotNull(consumer.getConsumerId()); + assertNotNull(consumer.getCreateTimeMs()); + assertNotNull(consumer.getLastAccessMs()); + assertNotNull(consumer.getName()); + assertNotNull(consumer.getOffset()); + assertNotNull(consumer.getLastTouch()); + + + } + + +} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java index 83866cf..ced6fc4 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java @@ -207,7 +207,7 @@ public class KafkaConsumerCacheTest { } } - } + } @Test public void testSignalOwnership() { @@ -216,22 +216,14 @@ public class KafkaConsumerCacheTest { try { kafka = new KafkaConsumerCache(); - // kafka.signalOwnership("testTopic", "CG1", "23"); - } catch (NoClassDefFoundError e) { - try { - kafka.signalOwnership("testTopic", "CG1", "23"); - } catch (KafkaConsumerCacheException e1) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NullPointerException e1) { - // TODO Auto-generated catch block - // assertTrue(true); - e1.printStackTrace(); - } - + try { + kafka.signalOwnership("testTopic", "CG1", "23"); + } catch (KafkaConsumerCacheException e) { + assertTrue(true); } + } catch (NoClassDefFoundError e) {} - // assertTrue(true); + // } @Test @@ -252,5 +244,6 @@ public class KafkaConsumerCacheTest { } } + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaLiveLockAvoider2Test.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaLiveLockAvoider2Test.java new file mode 100644 index 0000000..71d50e8 --- /dev/null +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaLiveLockAvoider2Test.java @@ -0,0 +1,108 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.mr.cambria.backends.kafka; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.modules.junit4.PowerMockRunner; + + +@RunWith(PowerMockRunner.class) +public class KafkaLiveLockAvoider2Test { + + @Mock + private CuratorFramework curatorFramework; + @Mock + private ExistsBuilder existsBuilder; + @Mock + private CreateBuilder createBuilder; + @Mock + private GetChildrenBuilder childrenBuilder; + @Mock + ProtectACLCreateModeStatPathAndBytesable<String> acl; + @InjectMocks + private KafkaLiveLockAvoider2 liveLockAvoider; + + public static final String ZNODE_ROOT = "/live-lock-avoid"; + public static final String ZNODE_LOCKS = "/locks"; + public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks"; + + private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS; + private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS; + + + @Before + public void setUp() throws Exception { + List<String> taskNodes= new ArrayList<String>(); + taskNodes.add("appId"); + MockitoAnnotations.initMocks(this); + PowerMockito.when(acl.forPath(locksPath)).thenReturn(locksPath); + PowerMockito.when(acl.forPath(tasksPath)).thenReturn(tasksPath); + PowerMockito.when(createBuilder.creatingParentsIfNeeded()).thenReturn(acl); + PowerMockito.when(curatorFramework.create()).thenReturn(createBuilder); + PowerMockito.when(curatorFramework.checkExists()).thenReturn(existsBuilder); + PowerMockito.when(childrenBuilder.forPath(tasksPath)).thenReturn(taskNodes); + PowerMockito.when(curatorFramework.getChildren()).thenReturn(childrenBuilder); + + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testUnlock(){ + liveLockAvoider.init(); + try { + liveLockAvoider.unlockConsumerGroup("appId", "groupName"); + } catch (Exception e) { + assertTrue(true); + } + } + + @Test + public void testWatcher(){ + try { + liveLockAvoider.startNewWatcherForServer("appId", null); + } catch (Exception e) { + assertTrue(true); + } + } + +} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java index 982fbf2..8292c2c 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java @@ -18,35 +18,43 @@ * ============LICENSE_END========================================================= */ - package org.onap.dmaap.mr.cambria.backends.kafka; +package org.onap.dmaap.mr.cambria.backends.kafka; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import org.apache.kafka.clients.producer.Producer; import org.junit.After; import org.junit.Before; -import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.dmaap.dmf.mr.backends.Publisher.message; +import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher; +import org.onap.dmaap.dmf.mr.beans.LogDetails; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import org.onap.dmaap.dmf.mr.backends.Publisher.message; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import kafka.common.FailedToSendMessageException; -import kafka.producer.KeyedMessage; - +@RunWith(PowerMockRunner.class) +@PrepareForTest({ Utils.class }) public class KafkaPublisherTest { - - - /*@Before + @Before public void setUp() throws Exception { - ClassLoader classLoader = getClass().getClassLoader(); - AJSCPropertiesMap.refresh(new File(classLoader.getResource("MsgRtrApi.properties").getFile())); + MockitoAnnotations.initMocks(this); + PowerMockito.mockStatic(Utils.class); + PowerMockito.when(Utils.isCadiEnabled()).thenReturn(true); + } @After @@ -54,100 +62,22 @@ public class KafkaPublisherTest { } @Test - public void testSendMessages() { - - String topic = "testTopic"; + public void testPublisherInit() { + + - KafkaPublisher kafka = null; try { - kafka = new KafkaPublisher(null); - - } catch (missingReqdSetting e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NoClassDefFoundError e) { try { - kafka.sendMessage(topic, null); - } catch (NullPointerException e1) { - // TODO Auto-generated catch block + KafkaPublisher kafkaPublisher = new KafkaPublisher(null); + } catch (missingReqdSetting e) { assertTrue(true); - } catch (FailedToSendMessageException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } catch (IOException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); } - } catch (FailedToSendMessageException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + } catch (LinkageError e) { + assertTrue(true); } } - @Test - public void testSendBatchMessage() { - - String topic = "testTopic"; - - KafkaPublisher kafka = null; - ArrayList<KeyedMessage<String, String>> kms = null; - try { - kafka = new KafkaPublisher(null); - - } catch (missingReqdSetting e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NoClassDefFoundError e) { - try { - kafka.sendBatchMessage(topic, kms); - } catch (NullPointerException e1) { - // TODO Auto-generated catch block - assertTrue(true); - } catch (IOException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - } catch (FailedToSendMessageException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - @Test - public void sendMessages() { - - String topic = "testTopic"; - - List<message> msgs = null; - - KafkaPublisher kafka = null; - //ArrayList<KeyedMessage<String, String>> kms = null; - try { - kafka = new KafkaPublisher(null); - - } catch (missingReqdSetting e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NoClassDefFoundError e) { - try { - kafka.sendMessages(topic, msgs); - } catch (NullPointerException e1) { - // TODO Auto-generated catch block - assertTrue(true); - } catch (FailedToSendMessageException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } catch (IOException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - } catch (FailedToSendMessageException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - }*/ } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/ApiKeyBeanTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/ApiKeyBeanTest.java index 2a79e92..beec641 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/ApiKeyBeanTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/ApiKeyBeanTest.java @@ -53,6 +53,16 @@ public class ApiKeyBeanTest { } + @Test + public void testApiKeyBean(){ + ApiKeyBean bean = new ApiKeyBean(); + bean.setDescription("description"); + bean.setEmail("email"); + assertEquals("description", bean.getDescription()); + assertEquals("email", bean.getEmail()); + + } + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPCambriaLimiterTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPCambriaLimiterTest.java index 853d770..f9b2554 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPCambriaLimiterTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPCambriaLimiterTest.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ - package org.onap.dmaap.mr.cambria.beans; +package org.onap.dmaap.mr.cambria.beans; import static org.junit.Assert.*; @@ -41,43 +41,57 @@ public class DMaaPCambriaLimiterTest { @Test public void testGetSleepMsForRate() { - - - double value = 3; - DMaaPCambriaLimiter.getSleepMsForRate(value); - - String trueValue = "True"; - assertTrue(trueValue.equalsIgnoreCase("True")); - + + assertEquals(1000, DMaaPCambriaLimiter.getSleepMsForRate(100)); + assertEquals(0, DMaaPCambriaLimiter.getSleepMsForRate(0)); + } - + @Test public void testOnCall() { - - DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(1,2, 3); + + DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(1, 2, 3); try { - limiter.onCall("testTopic", "ConsumerGroup1", "client2","remoteHost"); + limiter.onCall("testTopic", "ConsumerGroup1", "client2", "remoteHost"); } catch (CambriaApiException e) { // TODO Auto-generated catch block e.printStackTrace(); } - + String trueValue = "True"; assertTrue(trueValue.equalsIgnoreCase("True")); - + + } + + @Test + public void testOnCallError2() { + + DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(0, 2, 3, 1, 1); + try { + limiter.onCall("testTopic", "ConsumerGroup1", "client2", "remoteHost"); + } catch (CambriaApiException e) { + assertTrue(false); + } + + } + + @Test(expected = CambriaApiException.class) + public void testOnCallError() throws CambriaApiException { + + DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(0.9, 2, 3, 1, 1); + limiter.onCall("testTopic", "ConsumerGroup1", "client2", "remoteHost"); + } - + @Test public void testOnSend() { - - DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(3,3, 3); + + DMaaPCambriaLimiter limiter = new DMaaPCambriaLimiter(3, 3, 3); limiter.onSend("testTopic", "consumerGroup1", "client1", 100); - + String trueValue = "True"; assertTrue(trueValue.equalsIgnoreCase("True")); - + } - - } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPContextTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPContextTest.java index 9a67673..a945cd6 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPContextTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPContextTest.java @@ -27,6 +27,9 @@ import org.junit.Before; import org.junit.Test; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.springframework.mock.http.client.MockClientHttpRequest; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpSession; public class DMaaPContextTest { @@ -48,6 +51,21 @@ public class DMaaPContextTest { } + @Test + public void testDMaaPContext(){ + + DMaaPContext context=new DMaaPContext(); + context.setConsumerRequestTime("consumerRequestTime"); + assertEquals("consumerRequestTime", context.getConsumerRequestTime()); + MockHttpServletRequest request= new MockHttpServletRequest(); + MockHttpSession session=new MockHttpSession(); + request.setSession(session); + context.setRequest(request); + assertNotNull(context.getSession()); + + + } + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaConsumerFactoryTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaConsumerFactoryTest.java new file mode 100644 index 0000000..fd30359 --- /dev/null +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaConsumerFactoryTest.java @@ -0,0 +1,55 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.mr.cambria.beans; + +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Test; +import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory; +import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; +import org.onap.dmaap.mr.cambria.embed.EmbedConfigurationReader; + + +public class DMaaPKafkaConsumerFactoryTest { + + EmbedConfigurationReader embedConfigurationReader = new EmbedConfigurationReader(); + + @After + public void tearDown() throws Exception { + embedConfigurationReader.tearDown(); + } + +@Test +public void testConsumerFactory(){ + + try { + ConfigurationReader configurationReader = embedConfigurationReader.buildConfigurationReader(); + DMaaPKafkaConsumerFactory consumerFactory=(DMaaPKafkaConsumerFactory) configurationReader.getfConsumerFactory(); + consumerFactory.getConsumerFor("topic", "consumerGroupName", "consumerId", 10, "remotehost"); + } catch (Exception e) { + assertTrue(false); + } + assertTrue(true); +} + +} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java index 35f3064..0f4702a 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dmaap.mr.cambria.beans; +package org.onap.dmaap.mr.cambria.beans; import static org.junit.Assert.assertTrue; @@ -41,6 +41,7 @@ import org.onap.dmaap.dmf.mr.CambriaApiException; import org.apache.kafka.clients.admin.AdminClient; import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker; +import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker.KafkaTopic; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException; @@ -48,9 +49,8 @@ import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; - @RunWith(PowerMockRunner.class) -@PrepareForTest({ AdminClient.class}) +@PrepareForTest({ AdminClient.class }) public class DMaaPKafkaMetaBrokerTest { @InjectMocks @@ -74,11 +74,28 @@ public class DMaaPKafkaMetaBrokerTest { public void setUp() { MockitoAnnotations.initMocks(this); PowerMockito.mockStatic(AdminClient.class); - //PowerMockito.when(AdminClient.create (any(Properties.class) )).thenReturn(fKafkaAdminClient); - - //PowerMockito.mockStatic(AdminUtils.class); + // PowerMockito.when(AdminClient.create (any(Properties.class) + // )).thenReturn(fKafkaAdminClient); + + // PowerMockito.mockStatic(AdminUtils.class); PowerMockito.when(configDb.parse("/topics")).thenReturn(fBaseTopicData); - + + } + + @Test + public void testBrokercreate() { + DMaaPKafkaMetaBroker broker = new DMaaPKafkaMetaBroker(); + + } + + @Test + public void testcreateTopicEntry() { + try { + KafkaTopic kafkaTopic = new KafkaTopic("topics", configDb, fBaseTopicData); + dMaaPKafkaMetaBroker.createTopicEntry("name", "desc", "owner", true); + } catch (Exception e) { + assertTrue(true); + } } @@ -108,7 +125,6 @@ public class DMaaPKafkaMetaBrokerTest { } } - @Test public void testcreateTopic_wrongPartition() { diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/LogDetailsTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/LogDetailsTest.java index 8d83821..8cab96a 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/LogDetailsTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/LogDetailsTest.java @@ -65,6 +65,23 @@ public class LogDetailsTest { assertEquals(details.getSubscriberGroupId(),"1"); } + @Test + public void testPublisherdetails(){ + LogDetails details = new LogDetails(); + assertNotNull(details.getPublisherLogDetails()); + assertNull(details.getTransactionIdTs()); + assertFalse(details.isTransactionEnabled()); + assertEquals(details.getMessageLengthInBytes(),0); + assertNotNull(details.getPublishTimestamp()); + assertNull(details.getMessageTimestamp()); + assertNull(details.getMessageSequence()); + assertNull(details.getMessageBatchId()); + assertNull(details.getPublisherIp()); + assertNull(details.getTopicId()); + + + } + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/TopicBeanTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/TopicBeanTest.java index 844fc08..d88c408 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/TopicBeanTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/TopicBeanTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * ONAP Policy Engine + * * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -51,6 +51,26 @@ public class TopicBeanTest { } + @Test + public void testTopicBean() { + + TopicBean bean = new TopicBean("topicName", "topicDescription", 1,1,true); + assertNotNull(bean); + + } + + @Test + public void testTopicBeanStter() { + + TopicBean bean = new TopicBean(); + bean.setPartitionCount(1); + bean.setReplicationCount(1); + bean.setTopicDescription("topicDescription"); + bean.setTopicName("topicName"); + bean.setTransactionEnabled(true); + assertNotNull(bean); + } + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/resources/CambriaOutboundEventStreamTest.java b/src/test/java/org/onap/dmaap/mr/cambria/resources/CambriaOutboundEventStreamTest.java index 970bc60..3b556fb 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/resources/CambriaOutboundEventStreamTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/resources/CambriaOutboundEventStreamTest.java @@ -22,6 +22,9 @@ import static org.junit.Assert.*; +import java.io.IOException; + +import org.json.JSONException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -29,6 +32,7 @@ import org.junit.Test; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; +import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.operation; public class CambriaOutboundEventStreamTest { @@ -63,7 +67,26 @@ public class CambriaOutboundEventStreamTest { @Test public void testForEachMessage() { - //fail("Not yet implemented"); + try { + coes.forEachMessage(new operation(){ + + @Override + public void onWait() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void onMessage(int count, String msg, String transId, long offSet) + throws IOException, JSONException { + // TODO Auto-generated method stub + + } + + }); + } catch (Exception e) { + // TODO Auto-generated catch block + } } @Test diff --git a/src/test/java/org/onap/dmaap/mr/cambria/utils/DMaaPResponseBuilderTest.java b/src/test/java/org/onap/dmaap/mr/cambria/utils/DMaaPResponseBuilderTest.java index aaabb7d..002fc4b 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/utils/DMaaPResponseBuilderTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/utils/DMaaPResponseBuilderTest.java @@ -84,19 +84,40 @@ public class DMaaPResponseBuilderTest { } @Test + public void testrespondOkNoContentError(){ + dMaapContext.setResponse(null); + DMaaPResponseBuilder.respondOkNoContent(dMaapContext); + assertNull(dMaapContext.getResponse()); + } + + @Test public void testrespondOkWithHtml(){ DMaaPResponseBuilder.respondOkWithHtml(dMaapContext, "<head></head>"); assertEquals("text/html", response.getContentType()); + DMaaPResponseBuilder.respondOkWithHtml(dMaapContext, "<head></head>"); assertEquals(200, response.getStatus()); } @Test + public void testrespondOkWithHtmlError(){ + dMaapContext.setResponse(null); + DMaaPResponseBuilder.respondOkWithHtml(dMaapContext, "<head></head>"); + assertNull(dMaapContext.getResponse()); + } + + @Test public void testrespondWithError(){ DMaaPResponseBuilder.respondWithError(dMaapContext, 500, "InternalServerError"); assertEquals(500, response.getStatus()); } + @Test(expected=NullPointerException.class) + public void testInvalidrespondWithError(){ + dMaapContext.setResponse(null); + DMaaPResponseBuilder.respondWithError(dMaapContext, 500, "InternalServerError"); + } + @Test public void testrespondWithJsonError(){ JSONObject o = new JSONObject(); @@ -107,6 +128,16 @@ public class DMaaPResponseBuilderTest { } @Test + public void testInvalidrespondWithJsonError(){ + JSONObject o = new JSONObject(); + o.put("status", 500); + o.put("message", "InternalServerError"); + dMaapContext.setResponse(null); + DMaaPResponseBuilder.respondWithError(dMaapContext, 500, o); + assertNull(dMaapContext.getResponse()); + } + + @Test public void testrespondWithErrorInJson(){ DMaaPResponseBuilder.respondWithErrorInJson(dMaapContext, 500, "InternalServerError"); @@ -137,5 +168,11 @@ public class DMaaPResponseBuilderTest { assertEquals("application/octet-stream", response.getContentType()); assertEquals(200, response.getStatus()); } + + @Test(expected=NullPointerException.class) + public void testgetStreamForBinaryResponseError() throws IOException{ + dMaapContext.setResponse(null); + DMaaPResponseBuilder.getStreamForBinaryResponse(dMaapContext); + } } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/utils/EMailerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/utils/EMailerTest.java new file mode 100644 index 0000000..ab81bab --- /dev/null +++ b/src/test/java/org/onap/dmaap/mr/cambria/utils/EMailerTest.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Engine + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dmaap.mr.cambria.utils; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.junit.Test; +import org.onap.dmaap.dmf.mr.utils.Emailer; + +public class EMailerTest { + + @Test + public void testEmailer(){ + + Emailer emailer= new Emailer(); + try { + emailer.send("dummy@dummy.com", "subj", "body"); + } catch (IOException e) { + assertTrue(true); + } + + } + +} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java index b4645a3..8a4009b 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java @@ -27,7 +27,6 @@ import java.security.Principal; import java.text.SimpleDateFormat; import java.util.Date; -import javax.servlet.http.HttpServletRequest; import org.apache.http.auth.BasicUserPrincipal; import org.junit.After; @@ -122,4 +121,16 @@ public class UtilsTest { } + + @Test + public void testGetKey(){ + assertNotNull(Utils.getKafkaproperty()); + + } + + @Test + public void testCadiEnable(){ + assertFalse(Utils.isCadiEnabled()); + + } } |