diff options
Diffstat (limited to 'catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/Dev2DevDmaapConsumerTest.java')
-rw-r--r-- | catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/Dev2DevDmaapConsumerTest.java | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/Dev2DevDmaapConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/Dev2DevDmaapConsumerTest.java new file mode 100644 index 0000000000..45906c6f30 --- /dev/null +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/Dev2DevDmaapConsumerTest.java @@ -0,0 +1,91 @@ +package org.openecomp.sdc.be.components.distribution.engine; + +import com.att.nsa.mr.client.MRConsumer; +import com.google.gson.GsonBuilder; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.openecomp.sdc.common.api.ConfigurationSource; +import org.openecomp.sdc.common.impl.ExternalConfiguration; +import org.openecomp.sdc.common.impl.FSConfigurationSource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration("classpath:application-context-test.xml") +public class Dev2DevDmaapConsumerTest { + @Autowired + private ExecutorFactory executorFactory; + @Autowired + private DmaapClientFactory dmaapClientFactory; + + static ExecutorService notificationExecutor; + + static ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), "src/test/resources/config/catalog-be"); + static ConfigurationManager configurationManager = new ConfigurationManager(configurationSource); + + @Test + public void runConsumer() throws Exception{ + boolean isRunConsumer = false ; //change this to true if you wish to run consumer,default should be false + if ( isRunConsumer ){ + consumeDmaapTopic(); + }else{ + System.out.println( "CONSUMER TEST is disabled!!!! "); + } + assert true; + } + //@Ignore + //@Test + public void consumeDmaapTopic() throws Exception { + Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + System.out.println("uncaughtException -> "); + } + }; + + DmaapConsumerConfiguration dmaapConsumerParams = configurationManager.getConfiguration().getDmaapConsumerConfiguration(); + String topic = dmaapConsumerParams.getTopic(); + System.out.println(String.format( "Starting to consume topic %s for DMAAP consumer with the next parameters %s. ", topic, dmaapConsumerParams) ); + MRConsumer consumer = dmaapClientFactory.create( dmaapConsumerParams ); + notificationExecutor = executorFactory.create(topic + "Consumer", handler); + final int LIMIT = 2; + IntStream.range(0,LIMIT).forEach( i -> { + System.out.println("Trying to fetch messages from topic: "+ topic); + try { + Iterable<String> messages = consumer.fetch(); + if (messages != null) { + for (String msg : messages) { + System.out.println(String.format( "The DMAAP message %s received. The topic is %s.", msg, topic) ); + handleMessage(msg); + } + } + } + catch (Exception e) { + System.out.println("The exception occured upon fetching DMAAP message "+ e); + } + } + ); + + + } + private void handleMessage(String msg){ + try{ + DmaapNotificationDataImpl notificationData = new GsonBuilder().create().fromJson(msg,DmaapNotificationDataImpl.class); + System.out.println( "successfully parsed notification for environemnt "+notificationData.getOperationalEnvironmentId()); + }catch (Exception e){ + System.out.println( "failed to parse notification"); + } + } + @After + public void after(){ + if (notificationExecutor!=null && !notificationExecutor.isTerminated()) + notificationExecutor.shutdown(); + } +}
\ No newline at end of file |