diff options
author | Seshu Kumar M <seshu.kumar.m@huawei.com> | 2018-04-25 12:15:04 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-04-25 12:15:04 +0000 |
commit | 44416b271933aded25ff9c6b92a9c830d0359fc2 (patch) | |
tree | 93b7fe600544046c169851c8b77554ddc89dfa74 | |
parent | 9a8912ad8611d7efdd717e6b098c94c8ed060e45 (diff) | |
parent | 7c73254c00e5dc598f68e074d160b2d263872cda (diff) |
Merge "PnfReadyEventConsumer implementation"
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java) | 66 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml | 6 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java (renamed from bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java) | 15 |
3 files changed, 45 insertions, 42 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index 1fd2de97f2..830574bad4 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -20,7 +20,6 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.http.util.EntityUtils; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements DmaapClient { +public class PnfEventReadyDmaapClient implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -57,7 +56,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private int dmaapClientDelayInSeconds; private volatile boolean dmaapThreadListenerIsRunning; - public PnfEventReadyConsumer() { + public PnfEventReadyDmaapClient() { httpClient = HttpClientBuilder.create().build(); pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); executor = null; @@ -67,17 +66,6 @@ public class PnfEventReadyConsumer implements DmaapClient { getRequest = new HttpGet(buildURI()); } - //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting - @VisibleForTesting - void sendRequest() { - try { - HttpResponse response = httpClient.execute(getRequest); - getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); - } catch (IOException e) { - LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } - } - @Override public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); @@ -98,7 +86,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private synchronized void startDmaapThreadListener() { if (!dmaapThreadListenerIsRunning) { executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this::sendRequest, 0, + executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, dmaapClientDelayInSeconds, TimeUnit.SECONDS); dmaapThreadListenerIsRunning = true; } @@ -120,24 +108,6 @@ public class PnfEventReadyConsumer implements DmaapClient { .path(consumerGroup).path(consumerId).build(); } - private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); - } - } - return Optional.empty(); - } - - - private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - Runnable runnable = unregister(correlationId); - if (runnable != null) { - runnable.run(); - } - } - public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -170,4 +140,34 @@ public class PnfEventReadyConsumer implements DmaapClient { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } + class DmaapTopicListenerThread implements Runnable { + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = unregister(correlationId); + if (runnable != null) { + runnable.run(); + } + } + } + } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml index 4ddeba16be..13ab4f8f17 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml +++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml @@ -15,14 +15,14 @@ </bean>
<bean id="informDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.InformDmaapClient">
- <property name="dmaapClient" ref="pnfEventReadyConsumer"/>
+ <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>
</bean>
<bean id="cancelDmaapSubscription" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.CancelDmaapSubscription">
- <property name="dmaapClient" ref="pnfEventReadyConsumer"/>
+ <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>
</bean>
- <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"
+ <bean id="pnfEventReadyDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient"
init-method="init">
<property name="dmaapHost" value="${host}"/>
<property name="dmaapPort" value="${port}"/>
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java index 73b8247ebc..393730ed43 100644 --- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java +++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java @@ -43,8 +43,9 @@ import org.apache.http.message.BasicHttpResponse; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; -public class PnfEventReadyConsumerTest { +public class PnfEventReadyDmaapClientTest { private static final String CORRELATION_ID = "corrTestId"; private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; @@ -61,14 +62,15 @@ public class PnfEventReadyConsumerTest { private static final String CONSUMER_ID = "consumerTestId"; private static final String CONSUMER_GROUP = "consumerGroupTest"; - private PnfEventReadyConsumer testedObject; + private PnfEventReadyDmaapClient testedObject; + private DmaapTopicListenerThread testedObjectInnerClassThread; private HttpClient httpClientMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledExecutorService executorMock; @Before public void init() throws NoSuchFieldException, IllegalAccessException { - testedObject = new PnfEventReadyConsumer(); + testedObject = new PnfEventReadyDmaapClient(); testedObject.setDmaapHost(HOST); testedObject.setDmaapPort(PORT); testedObject.setDmaapProtocol(PROTOCOL); @@ -78,6 +80,7 @@ public class PnfEventReadyConsumerTest { testedObject.setConsumerGroup(CONSUMER_GROUP); testedObject.setDmaapClientDelayInSeconds(1); testedObject.init(); + testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); httpClientMock = mock(HttpClient.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledExecutorService.class); @@ -96,7 +99,7 @@ public class PnfEventReadyConsumerTest { throws IOException { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID))); - testedObject.sendRequest(); + testedObjectInnerClassThread.run(); ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class); verify(httpClientMock).execute(captor1.capture()); assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL) @@ -119,7 +122,7 @@ public class PnfEventReadyConsumerTest { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse( String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP))); - testedObject.sendRequest(); + testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } @@ -133,7 +136,7 @@ public class PnfEventReadyConsumerTest { public void correlationIdIsNotFoundInHttpResponse() throws IOException { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID)); - testedObject.sendRequest(); + testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } |