From b196509debdb43411abfcdc55b3ce87a0a30da6e Mon Sep 17 00:00:00 2001 From: Lukasz Muszkieta Date: Thu, 12 Apr 2018 18:24:12 +0200 Subject: PnfReadyEventConsumer implementation Change-Id: I7252400a3f60ca22ddfa71edb28eaf1d16ccd9b4 Issue-ID: SO-466 Signed-off-by: Lukasz Muszkieta --- .../pnf/dmaap/PnfEventReadyConsumer.java | 99 +++++++++++++-- .../src/main/resources/dmaap.properties | 10 +- .../src/main/webapp/WEB-INF/applicationContext.xml | 15 ++- .../pnf/dmaap/PnfEventReadyConsumerTest.java | 136 ++++++++++++++++++--- .../src/test/resources/dmaapTest.properties | 7 -- .../springConfig_PnfEventReadyConsumer.xml | 19 --- 6 files changed, 224 insertions(+), 62 deletions(-) delete mode 100644 bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties delete mode 100644 bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml (limited to 'bpmn') diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index e6019f73f0..6871665ba1 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -22,17 +22,28 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.UriBuilder; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer { +public class PnfEventReadyConsumer implements Runnable, DmaapClient { + + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; private HttpClient httpClient; - private String dmaapHost; private int dmaapPort; private String dmaapProtocol; @@ -40,24 +51,57 @@ public class PnfEventReadyConsumer { private String dmaapTopicName; private String consumerId; private String consumerGroup; + private Map pnfCorrelationIdToThreadMap; + private HttpGet getRequest; + private ScheduledExecutorService executor; + private int dmaapClientInitialDelayInSeconds; + private int dmaapClientDelayInSeconds; + private boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + executor = null; + } + + public void init() { + getRequest = new HttpGet(buildURI()); + } + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + @Override + public void registerForUpdate(String correlationId, Runnable informConsumer) { + pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); + if (!dmaapThreadListenerIsRunning) { + startDmaapThreadListener(); + } } - public void notifyWhenPnfReady(String correlationId) - throws IOException { - HttpGet getRequest = new HttpGet(buildURI(consumerGroup, consumerId)); - HttpResponse response = httpClient.execute(getRequest); - checkIfResponseIsAccepted(response, correlationId); + private void startDmaapThreadListener() { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; } - private boolean checkIfResponseIsAccepted(HttpResponse response, String correlationId) { - // TODO parse response if contains proper correlationId - return false; + private void stopDmaapThreadListener() { + if (dmaapThreadListenerIsRunning) { + executor.shutdownNow(); + dmaapThreadListenerIsRunning = false; + executor = null; + } } - private URI buildURI(String consumerGroup, String consumerId) { + private URI buildURI() { return UriBuilder.fromUri(dmaapUriPathPrefix) .scheme(dmaapProtocol) .host(dmaapHost) @@ -65,6 +109,31 @@ public class PnfEventReadyConsumer { .path(consumerGroup).path(consumerId).build(); } + private Optional getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + + private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() + .ifPresent(this::informAboutPnfReady); + } + + private void informAboutPnfReady(String correlationId) { + pnfCorrelationIdToThreadMap.get(correlationId).run(); + pnfCorrelationIdToThreadMap.remove(correlationId); + + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + } + public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -93,4 +162,12 @@ public class PnfEventReadyConsumer { this.consumerGroup = consumerGroup; } + public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { + this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; + } + + public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { + this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; + } + } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties b/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties index 3c4dca49cf..5b1ffac571 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties +++ b/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties @@ -1,7 +1,9 @@ -dmaapHost=HOSTNAME -dmaapPort=3905 -dmaapProtocol=http -dmaapUriPathPrefix = events +host=HOSTNAME +port=3905 +protocol=http +uriPathPrefix = events eventReadyTopicName=pnfEventReady consumerId=consumerId consumerGroup=group +clientThreadInitialDelayInSeconds=1 +clientThreadDelayInSeconds=5 \ No newline at end of file diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml index ed1556b05d..6fe70ff614 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml +++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml @@ -19,14 +19,17 @@ - - - - - + + + + + - + + + diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java index 2f6a00db66..ef8fa3dd1e 100644 --- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java +++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java @@ -21,48 +21,154 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.ProtocolVersion; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHttpResponse; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; -@RunWith(SpringRunner.class) -@ContextConfiguration({"classpath:springConfig_PnfEventReadyConsumer.xml"}) public class PnfEventReadyConsumerTest { - @Autowired - private PnfEventReadyConsumer pnfEventReadyConsumer; + private static final String CORRELATION_ID = "corrTestId"; + private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; + private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = + "{\"pnfRegistrationFields\":{\"correlationId\":\"%s\"}}"; + private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID = + "{\"pnfRegistrationFields\":{\"field\":\"value\"}}"; + private static final String HOST = "hostTest"; + private static final int PORT = 1234; + private static final String PROTOCOL = "http"; + private static final String URI_PATH_PREFIX = "eventsForTesting"; + private static final String EVENT_TOPIC_TEST = "eventTopicTest"; + private static final String CONSUMER_ID = "consumerTestId"; + private static final String CONSUMER_GROUP = "consumerGroupTest"; + + private PnfEventReadyConsumer testedObject; private HttpClient httpClientMock; + private Runnable threadMockToNotifyCamundaFlow; + private ScheduledExecutorService executorMock; @Before public void init() throws NoSuchFieldException, IllegalAccessException { + testedObject = new PnfEventReadyConsumer(); + testedObject.setDmaapHost(HOST); + testedObject.setDmaapPort(PORT); + testedObject.setDmaapProtocol(PROTOCOL); + testedObject.setDmaapUriPathPrefix(URI_PATH_PREFIX); + testedObject.setDmaapTopicName(EVENT_TOPIC_TEST); + testedObject.setConsumerId(CONSUMER_ID); + testedObject.setConsumerGroup(CONSUMER_GROUP); + testedObject.setDmaapClientInitialDelayInSeconds(1); + testedObject.setDmaapClientDelayInSeconds(1); + testedObject.init(); httpClientMock = mock(HttpClient.class); + threadMockToNotifyCamundaFlow = mock(Runnable.class); + executorMock = mock(ScheduledExecutorService.class); setPrivateField(); } + /** + * Test run method, where the are following conditions: + *

- DmaapThreadListener is running, flag is set to true + *

- map is filled with one entry with the key that we get from response + *

run method should invoke thread from map to notify camunda process, remove element from the map (map is empty) + * and shutdown the executor because of empty map + */ @Test - public void restClientInvokesWithProperURI() throws Exception { + public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady() + throws IOException { + when(httpClientMock.execute(any(HttpGet.class))). + thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID))); + testedObject.run(); ArgumentCaptor captor1 = ArgumentCaptor.forClass(HttpGet.class); - pnfEventReadyConsumer.notifyWhenPnfReady("correlationId"); verify(httpClientMock).execute(captor1.capture()); - assertThat(captor1.getValue().getURI()).hasHost("hostTest").hasPort(1234).hasScheme("http") - .hasPath("/eventsForTesting/eventTopicTest/consumerGroupTest/consumerTestId"); + assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL) + .hasPath( + "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + ""); + verify(threadMockToNotifyCamundaFlow).run(); + verify(executorMock).shutdownNow(); + } + + /** + * Test run method, where the are following conditions: + *

- DmaapThreadListener is running, flag is set to true + *

- map is filled with one entry with the correlationId that does not match to correlationId + * taken from http response. run method should not do anything with the map not run any thread to + * notify camunda process + */ + @Test + public void correlationIdIsFoundInHttpResponse_NotFoundInMap() + throws IOException { + when(httpClientMock.execute(any(HttpGet.class))). + thenReturn(createResponse( + String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP))); + testedObject.run(); + verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); + } + + /** + * Test run method, where the are following conditions: + *

- DmaapThreadListener is running, flag is set to true + *

- map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse + * run method should not do anything with the map and not run any thread to notify camunda process + */ + @Test + public void correlationIdIsNotFoundInHttpResponse() throws IOException { + when(httpClientMock.execute(any(HttpGet.class))). + thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID)); + testedObject.run(); + verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { - Field field = pnfEventReadyConsumer.getClass().getDeclaredField("httpClient"); - field.setAccessible(true); - field.set(pnfEventReadyConsumer, httpClientMock); + Field httpClientField = testedObject.getClass().getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + httpClientField.set(testedObject, httpClientMock); + httpClientField.setAccessible(false); + + Field executorField = testedObject.getClass().getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(testedObject, executorMock); + executorField.setAccessible(false); + + Field pnfCorrelationToThreadMapField = testedObject.getClass() + .getDeclaredField("pnfCorrelationIdToThreadMap"); + pnfCorrelationToThreadMapField.setAccessible(true); + Map pnfCorrelationToThreadMap = new ConcurrentHashMap<>(); + pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow); + pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap); + + Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning"); + threadRunFlag.setAccessible(true); + threadRunFlag.set(testedObject, true); + threadRunFlag.setAccessible(false); + } + + private HttpResponse createResponse(String json) throws UnsupportedEncodingException { + HttpEntity entity = new StringEntity(json); + ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); + HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); + response.setEntity(entity); + response.setStatusCode(200); + return response; } } diff --git a/bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties b/bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties deleted file mode 100644 index a8df15c5df..0000000000 --- a/bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties +++ /dev/null @@ -1,7 +0,0 @@ -dmaapHost=hostTest -dmaapPort=1234 -dmaapProtocol=http -dmaapUriPathPrefix = eventsForTesting -eventReadyTopicName=eventTopicTest -consumerId=consumerTestId -consumerGroup=consumerGroupTest \ No newline at end of file diff --git a/bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml b/bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml deleted file mode 100644 index 5abee9dfd9..0000000000 --- a/bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - - -- cgit 1.2.3-korg