diff options
Diffstat (limited to 'src')
9 files changed, 98 insertions, 302 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java index ac68a11..25dc769 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.backends.kafka; @@ -96,27 +96,34 @@ public class KafkaConsumerCache { // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; - private enum Status { - NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED - } - - + // private final rrNvReadable fSettings; + private MetricsSet fMetrics; + private final String fBaseZkPath; + private final ScheduledExecutorService fSweepScheduler; + private String fApiId; + + private final ConnectionStateListener listener; + + private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; + private PathChildrenCache curatorConsumerCache; + + private volatile Status status; @Autowired private DMaaPErrorMessages errorMessages; - + /** * User defined exception class for kafka consumer cache - * + * * @author nilanjana.maity * */ public class KafkaConsumerCacheException extends Exception { /** * To throw the exception - * + * * @param t */ KafkaConsumerCacheException(Throwable t) { @@ -124,7 +131,7 @@ public class KafkaConsumerCache { } /** - * + * * @param s */ public KafkaConsumerCacheException(String s) { @@ -134,6 +141,10 @@ public class KafkaConsumerCache { private static final long serialVersionUID = 1L; } + private enum Status { + NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED + } + /** * Creates a KafkaConsumerCache object. Before it is used, you must call * startCache() @@ -178,7 +189,7 @@ public class KafkaConsumerCache { /** * Start the cache service. This must be called before any get/put * operations. - * + * * @param mode * DMAAP or cambria * @param curator @@ -251,8 +262,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - - + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -274,7 +285,7 @@ public class KafkaConsumerCache { /** * Getting the curator oject to start the zookeeper connection estabished - * + * * @param curator * @return curator object */ @@ -335,7 +346,7 @@ public class KafkaConsumerCache { * valid) In addition, this method waits for all other consumer caches in * the cluster to release their ownership and delete their version of this * consumer. - * + * * @param topic * @param consumerGroupId * @param clientId @@ -541,12 +552,6 @@ public class KafkaConsumerCache { return true; } - // private final rrNvReadable fSettings; - private MetricsSet fMetrics; - private final String fBaseZkPath; - private final ScheduledExecutorService fSweepScheduler; - private String fApiId; - public void setfMetrics(final MetricsSet metrics) { this.fMetrics = metrics; } @@ -555,13 +560,6 @@ public class KafkaConsumerCache { this.fApiId = id; } - private final ConnectionStateListener listener; - - private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; - private PathChildrenCache curatorConsumerCache; - - private volatile Status status; - private void handleReconnection() { log.info("Reading current cache data from ZK and synchronizing local cache"); @@ -664,15 +662,15 @@ public class KafkaConsumerCache { } public void sweep() { - final LinkedList<String> removals = new LinkedList<String>(); + final LinkedList<String> removals = new LinkedList<>(); long mustTouchEveryMs = kDefault_MustTouchEveryMs; String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_TouchEveryMs); - if (null != strkSetting_TouchEveryMs) { + if (null != strkSetting_TouchEveryMs && !strkSetting_TouchEveryMs.isEmpty()) { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -692,7 +690,7 @@ public class KafkaConsumerCache { /** * Creating a thread to run the sweep method - * + * * @author nilanjana.maity * */ @@ -707,7 +705,7 @@ public class KafkaConsumerCache { /** * This method is to drop consumer - * + * * @param topic * @param consumerGroup * @param clientId @@ -725,5 +723,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - + }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java index fe25e58..0b687ca 100644 --- a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java +++ b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java @@ -8,36 +8,29 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.filemonitor; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Properties; - import javax.annotation.PostConstruct; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - - -//import com.att.ssf.filemonitor.FileChangedListener; -//import com.att.ssf.filemonitor.FileMonitor; - /** * ServicePropertyService class * @author rajashree.khare @@ -68,17 +61,6 @@ public class ServicePropertyService { try { getFileList(FILE_CHANGE_LISTENER_LOC); - /*for (File file : fileList) { - FileChangedListener fileChangedListener = this.fileChangedListener; - Object filePropertiesMap = this.filePropertiesMap; - Method m = filePropertiesMap.getClass().getMethod( - "refresh", File.class); - m.invoke(filePropertiesMap, file); - FileMonitor fm = FileMonitor.getInstance(); - fm.addFileChangedListener(file, fileChangedListener, - loadOnStartup); - - }*/ } catch (Exception ex) { logger.error("Error creating property map ", ex); } @@ -87,10 +69,9 @@ public class ServicePropertyService { private void getFileList(String dirName) throws IOException { File directory = new File(dirName); - FileInputStream fis = null; if (fileList == null) - fileList = new ArrayList<File>(); + fileList = new ArrayList<>(); // get all the files that are ".json" or ".properties", from a directory // & it's sub-directories @@ -99,8 +80,8 @@ public class ServicePropertyService { for (File file : fList) { // read service property files from the configuration file if (file.isFile() && file.getPath().endsWith(USER_CONFIG_FILE)) { - try { - fis = new FileInputStream(file); + try(FileInputStream fis = new FileInputStream(file)) { + Properties prop = new Properties(); prop.load(fis); @@ -109,8 +90,6 @@ public class ServicePropertyService { } } catch (Exception ioe) { logger.error("Error reading the file stream ", ioe); - } finally { - fis.close(); } } else if (file.isDirectory()) { getFileList(file.getPath()); diff --git a/src/test/java/org/onap/dmaap/filemonitor/JUnitTestSuite.java b/src/test/java/org/onap/dmaap/filemonitor/JUnitTestSuite.java index 9c87f24..077c4fd 100644 --- a/src/test/java/org/onap/dmaap/filemonitor/JUnitTestSuite.java +++ b/src/test/java/org/onap/dmaap/filemonitor/JUnitTestSuite.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,14 +28,14 @@ import org.junit.runners.Suite.SuiteClasses; import org.apache.log4j.Logger; @RunWith(Suite.class) -@SuiteClasses({ ServicePropertiesListenerTest.class, ServicePropertiesMapTest.class, +@SuiteClasses({ServicePropertiesMapTest.class, ServicePropertyServiceTest.class, }) public class JUnitTestSuite { private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class); public static void main(String[] args) { LOGGER.info("Running the test suite"); - + TestSuite tstSuite = new TestSuite(); LOGGER.info("Total Test Counts " + tstSuite.countTestCases()); } diff --git a/src/test/java/org/onap/dmaap/filemonitor/ServicePropertiesListenerTest.java b/src/test/java/org/onap/dmaap/filemonitor/ServicePropertiesListenerTest.java deleted file mode 100644 index cae0c68..0000000 --- a/src/test/java/org/onap/dmaap/filemonitor/ServicePropertiesListenerTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP Policy Engine - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - - package org.onap.dmaap.filemonitor; - -import static org.junit.Assert.*; - -import java.io.File; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class ServicePropertiesListenerTest { - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - //@Test - public void testUpdate() {/* - - ServicePropertiesListener listener = new ServicePropertiesListener(); - - try { - listener.update(new File(":/file")); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - assertTrue(true); - - */} - -}
\ No newline at end of file diff --git a/src/test/java/org/onap/dmaap/mr/cambria/CambriaApiTestCase.java b/src/test/java/org/onap/dmaap/mr/cambria/CambriaApiTestCase.java deleted file mode 100644 index 0a1af90..0000000 --- a/src/test/java/org/onap/dmaap/mr/cambria/CambriaApiTestCase.java +++ /dev/null @@ -1,50 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.mr.cambria; - -import junit.framework.TestCase; -import org.junit.Ignore; - -import java.util.HashMap; -import java.util.Map; - -@Ignore -public class CambriaApiTestCase extends TestCase { - - @Override - protected void setUp() throws Exception { - final Map<String, String> argMap = new HashMap<String, String> (); - - argMap.put("broker.type", "memory"); - argMap.put("accounts.dao.class", "com.att.nsa.fe3c.dao.memory.MemoryDAOFactory"); - argMap.put("topic.dao.class", "com.att.nsa.fe3c.dao.memory.MemoryDAOFactory"); - - //CambriaApiServer.start(argMap); - System.out.println("setUp() complete"); - } - - public void tearDown() throws Exception { - System.out.println("tearDown() started"); - //CambriaApiServer.stop(); - System.out.println("tearDown() complete"); - } -} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/CambriaRateLimiterTest.java b/src/test/java/org/onap/dmaap/mr/cambria/CambriaRateLimiterTest.java deleted file mode 100644 index 51b617b..0000000 --- a/src/test/java/org/onap/dmaap/mr/cambria/CambriaRateLimiterTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package org.onap.dmaap.mr.cambria; - -import org.junit.Test; - -public class CambriaRateLimiterTest -{ - @Test - public void testRateLimiter () - { - /*final NsaTestClock clock = new NsaTestClock(1, false); - - final String topic = "topic"; - final String consumerGroup = "group"; - final String clientId = "id"; - - final int window = 5; - - // rate limit: 1 empty call/min avg over 5 minutes, with 10ms delay - final CambriaRateLimiter rater = new CambriaRateLimiter ( 1.0, window, 10 ); - try - { - // prime with a call to start rate window - rater.onCall ( topic, consumerGroup, clientId ); - rater.onSend ( topic, consumerGroup, clientId, 1 ); - clock.addMs ( 1000*60*window ); - - // rate should now be 0, with a good window - for ( int i=0; i<4; i++ ) - { - clock.addMs ( 1000*15 ); - rater.onCall ( topic, consumerGroup, clientId ); - rater.onSend ( topic, consumerGroup, clientId, 0 ); - } - // rate is now 0.8 = 4 calls in last 5 minutes = 4/5 = 0.8 - - clock.addMs ( 1000*15 ); - rater.onCall ( topic, consumerGroup, clientId ); - rater.onSend ( topic, consumerGroup, clientId, 0 ); - // rate = 1.0 = 5 calls in last 5 mins - - clock.addMs ( 1000 ); - rater.onCall ( topic, consumerGroup, clientId ); - rater.onSend ( topic, consumerGroup, clientId, 0 ); - // rate = 1.2 = 6 calls in last 5 mins, should fire - - fail ( "Should have thrown rate limit exception." ); - } - catch ( CambriaApiException x ) - { - // good - }*/ - } -} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/JUnitTestSuite.java b/src/test/java/org/onap/dmaap/mr/cambria/JUnitTestSuite.java index bcac8d9..884247e 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/JUnitTestSuite.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/JUnitTestSuite.java @@ -28,8 +28,7 @@ import org.junit.runners.Suite; import org.junit.runners.Suite.SuiteClasses; @RunWith(Suite.class) -@SuiteClasses({CambriaApiExceptionTest.class, CambriaApiVersionInfoTest.class, - CambriaApiTestCase.class, CambriaRateLimiterTest.class,}) +@SuiteClasses({CambriaApiExceptionTest.class, CambriaApiVersionInfoTest.class}) public class JUnitTestSuite { private static final Logger LOGGER = LogManager.getLogger(JUnitTestSuite.class); diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java index 10526c5..ffcafdb 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/Kafka011ConsumerTest.java @@ -8,20 +8,21 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License. + * limitations under the License.# * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.mr.cambria.backends.kafka; import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.att.ajsc.filemonitor.AJSCPropertiesMap; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -43,8 +44,8 @@ import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({ AJSCPropertiesMap.class }) public class Kafka011ConsumerTest { - - + + @Mock private KafkaConsumer<String, String> cc; @Mock @@ -53,13 +54,13 @@ public class Kafka011ConsumerTest { @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - + } @After public void tearDown() throws Exception { } - + @Test public void testKafka011Consumer() { PowerMockito.mockStatic(AJSCPropertiesMap.class); @@ -71,21 +72,21 @@ public class Kafka011ConsumerTest { consumer.touch(); consumer.setOffset(10); } catch (Exception e) { - + } assertNotNull(consumer); assertNotNull(consumer.getConsumer()); assertNotNull(consumer.getConsumerGroup()); assertNotNull(consumer.getConsumerId()); assertNotNull(consumer.getConsumerId()); - assertNotNull(consumer.getCreateTimeMs()); - assertNotNull(consumer.getLastAccessMs()); + assertTrue(consumer.getCreateTimeMs() > 0); + assertTrue(consumer.getLastAccessMs() > 0); assertNotNull(consumer.getName()); - assertNotNull(consumer.getOffset()); - assertNotNull(consumer.getLastTouch()); - - + assertTrue(consumer.getOffset() > 0); + assertTrue(consumer.getLastTouch() > 0); + + } - + } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java index 457fff4..8ffe064 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaConsumerCacheTest.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,60 +20,54 @@ package org.onap.dmaap.mr.cambria.backends.kafka; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; - import org.apache.curator.framework.CuratorFramework; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.core.classloader.annotations.PrepareForTest; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; - +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.dmaap.dmf.mr.backends.Consumer; import org.onap.dmaap.dmf.mr.backends.MetricsSet; import org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer; import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache; import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; -import org.onap.dmaap.dmf.mr.constants.CambriaConstants; -@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) -@RunWith(PowerMockRunner.class) -@PrepareForTest({ AJSCPropertiesMap.class }) + +@RunWith(MockitoJUnitRunner.class) public class KafkaConsumerCacheTest { - private KafkaConsumerCache kafkaConsumerCache =null; + @Mock private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; @Mock private MetricsSet fMetrics; + @Mock + private AJSCPropertiesMap ajscPropertiesMap; + @InjectMocks + private KafkaConsumerCache kafkaConsumerCache; @Before public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - } @After public void tearDown() throws Exception { } - @Test public void testSweep() { - kafkaConsumerCache = new KafkaConsumerCache(); - PowerMockito.mockStatic(AJSCPropertiesMap.class); - PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "kSetting_TouchEveryMs")).thenReturn("100"); kafkaConsumerCache.sweep(); - + Collection<? extends Consumer> cachedConsumers = + kafkaConsumerCache.getConsumers(); + assertTrue(cachedConsumers.isEmpty()); } - + // DOES NOT WORK @Test @@ -81,16 +75,16 @@ public class KafkaConsumerCacheTest { /* * KafkaConsumerCache kafka = null; - * + * * try { kafka = new KafkaConsumerCache("123", null); - * + * * } catch (NoClassDefFoundError e) { try { kafka.startCache("DMAAP", * null); } catch (NullPointerException e1) { // TODO Auto-generated * catch block assertTrue(true); } catch (KafkaConsumerCacheException * e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } */ - + new CuratorFrameworkImpl(); new MetricsSetImpl(); KafkaConsumerCache kafka=null; @@ -123,16 +117,22 @@ public class KafkaConsumerCacheTest { /* * @Test public void testStopCache() { - * + * * KafkaConsumerCache kafka = null; new CuratorFrameworkImpl(); new * MetricsSetImpl(); try { kafka = new KafkaConsumerCache("123", null); * kafka.stopCache(); } catch (NoClassDefFoundError e) { - * + * * } - * + * * } */ + @Test(expected= KafkaConsumerCacheException.class) + public void testGetConsumerThrowNoConnectionToCache() throws KafkaConsumerCacheException{ + KafkaConsumerCache kafka = new KafkaConsumerCache(); + kafka.getConsumerFor("testTopic", "CG1", "23"); + } + @Test public void testGetConsumerFor() { @@ -208,7 +208,7 @@ public class KafkaConsumerCacheTest { } } - } + } @Test public void testSignalOwnership() { @@ -224,7 +224,7 @@ public class KafkaConsumerCacheTest { } } catch (NoClassDefFoundError e) {} - // + // } @Test @@ -245,6 +245,6 @@ public class KafkaConsumerCacheTest { } } - + } |