diff options
author | Rupali Shirode <rupali.shirode@t-systems.com> | 2023-12-07 16:39:19 +0530 |
---|---|---|
committer | Rupali Shirode <rupali.shirode@t-systems.com> | 2023-12-07 16:39:53 +0530 |
commit | 61b3ff91485571c24834b31c6ee7efc7ab1d0243 (patch) | |
tree | e66c5fa97e486d3ba4c54ae83862f2f6d72c0cfc /bpmn | |
parent | e83aa94d93e78e92fbed0c45924fce5aaf2d00c8 (diff) |
[SO] Remove DMaap Dependency in SO-bpmn-infra
Remove DMaap Dependency in SO-bpmn-infra
Issue-ID: SO-4122
Change-Id: I8fbe5761430c21b3f49b31a45ede095fdb72628f
Signed-off-by: Rupali Shirode <rupali.shirode@t-systems.com>
Diffstat (limited to 'bpmn')
4 files changed, 101 insertions, 142 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java index a932e4ac50..9cb566f49b 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java @@ -41,18 +41,16 @@ public final class JsonUtilForPnfCorrelationId { throw new IllegalStateException("Utility class"); } - static List<String> parseJsonToGelAllPnfCorrelationId(String json) { - if (json == null || json.isEmpty()) { + static List<String> parseJsonToGelAllPnfCorrelationId(List<String> list) { + if (list == null || list.isEmpty()) { return Collections.emptyList(); } - JsonElement je = new JsonParser().parse(json); - JsonArray array = je.getAsJsonArray(); - List<String> list = new ArrayList<>(); - Spliterator<JsonElement> spliterator = array.spliterator(); - spliterator.forEachRemaining(jsonElement -> handleEscapedCharacters(jsonElement) + + List<String> newList = new ArrayList<>(); + list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) - .ifPresent(pnfCorrelationId -> list.add(pnfCorrelationId)))); - return list; + .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); + return newList; } private static Optional<JsonObject> handleEscapedCharacters(JsonElement jsonElement) { diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index f215d496cf..44b16dad28 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -28,12 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.ws.rs.core.UriBuilder; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; +import org.onap.so.client.kafka.KafkaConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,38 +38,42 @@ import org.springframework.stereotype.Component; @Component public class PnfEventReadyDmaapClient implements DmaapClient { private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); - private HttpClient httpClient; private Map<String, Runnable> pnfCorrelationIdToThreadMap; - private HttpGet getRequestForpnfReady; - private HttpGet getRequestForPnfUpdate; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; private volatile boolean dmaapThreadListenerIsRunning; + private KafkaConsumerImpl consumerForPnfReady; + private KafkaConsumerImpl consumerForPnfUpdate; + private String pnfReadyTopic; + private String pnfUpdateTopic; + private String consumerGroup; + private String consumerId; + private String consumerIdUpdate; @Autowired - public PnfEventReadyDmaapClient(Environment env) { - httpClient = HttpClientBuilder.create().build(); + public PnfEventReadyDmaapClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); executor = null; - getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) - .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) - .port(env.getProperty("pnf.dmaap.port", Integer.class)) - .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup")) - .path(env.getProperty("pnf.dmaap.consumerId")).build()); - getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) - .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) - .port(env.getProperty("pnf.dmaap.port", Integer.class)) - .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup")) - .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build()); + try { + consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + } catch (Exception e) { + throw new RuntimeException(e); + } + pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); + pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); + consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); + consumerId = env.getProperty("pnf.kafka.consumerId"); + consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); } @Override public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); + logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); @@ -83,9 +82,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Override public synchronized Runnable unregister(String pnfCorrelationId) { - logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); + logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); if (pnfCorrelationIdToThreadMap.isEmpty()) { + consumerForPnfUpdate.close(); + consumerForPnfReady.close(); stopDmaapThreadListener(); } return runnable; @@ -114,30 +115,25 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Override public void run() { try { - HttpResponse response; - response = httpClient.execute(getRequestForPnfUpdate); - List<String> pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response); - if (pnfUpdateResponse.isEmpty()) { - response = httpClient.execute(getRequestForpnfReady); + List<String> response; + System.out.println(pnfUpdateTopic + " " + consumerGroup); + response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); + if (response.isEmpty()) { + response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); getPnfCorrelationIdListFromResponse(response) .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); } else { - pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); } } catch (IOException e) { - logger.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } finally { - getRequestForpnfReady.reset(); - getRequestForPnfUpdate.reset(); + logger.error("Exception caught during sending rest request to kafka for listening event topic", e); } } - private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString); - } + private List<String> getPnfCorrelationIdListFromResponse(List<String> response) throws IOException { + if (response != null) { + return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); } return Collections.emptyList(); } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java index 4edee24531..f9e4cb4c88 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java @@ -23,43 +23,47 @@ package org.onap.so.bpmn.infrastructure.pnf.dmaap; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; import java.util.List; import org.junit.Test; public class JsonUtilForPnfCorrelationIdTest { + private static final List<String> LIST_EXAMPLE_WITH_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List<String> LIST_WITH_ONE_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List<String> LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = new ArrayList<>(); + private static final List<String> LIST_WITH_NO_PNF_CORRELATION_ID = new ArrayList<>(); - private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"corrTest1\"," - + "\"key1\":\"value1\"},{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}]"; - - private static final String JSON_WITH_ONE_PNF_CORRELATION_ID = "[{\"correlationId\":\"corrTest3\"}]"; - - private static final String JSON_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = - "[\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\", \"{\\\"correlationId\\\":\\\"corrTest5\\\"}\"]"; - - private static final String JSON_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]"; + static { + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest1\",\"key1\":\"value1\"}"); + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}"); + LIST_WITH_ONE_PNF_CORRELATION_ID.add("{\"correlationId\":\"corrTest3\"}"); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\""); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest5\\\"}\""); + LIST_WITH_NO_PNF_CORRELATION_ID.add("{\"key1\":\"value1\"}"); + } @Test public void parseJsonSuccessful() { List<String> expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_EXAMPLE_WITH_PNF_CORRELATION_ID); assertThat(expectedResult).containsExactly("corrTest1", "corrTest2"); List<String> expectedResult2 = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_WITH_ONE_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_ONE_PNF_CORRELATION_ID); assertThat(expectedResult2).containsExactly("corrTest3"); } @Test public void parseJsonWithEscapeCharacters_Successful() { List<String> expectedResult = JsonUtilForPnfCorrelationId - .parseJsonToGelAllPnfCorrelationId(JSON_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); + .parseJsonToGelAllPnfCorrelationId(LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); assertThat(expectedResult).containsExactly("corrTest4", "corrTest5"); } @Test public void parseJson_emptyListReturnedWhenNothingFound() { List<String> expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_WITH_NO_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_NO_PNF_CORRELATION_ID); assertThat(expectedResult).isEmpty(); } @@ -70,6 +74,6 @@ public class JsonUtilForPnfCorrelationIdTest { @Test public void shouldReturnEmptyListWhenInputIsEmpty() { - assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId("")).isEmpty(); + assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java index bbb6aad49b..546e644fbd 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java @@ -28,45 +28,35 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.entity.InputStreamEntity; -import org.apache.http.message.BasicHttpResponse; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; +import org.onap.so.client.kafka.KafkaConsumerImpl; import org.springframework.core.env.Environment; + @RunWith(MockitoJUnitRunner.class) public class PnfEventReadyDmaapClientTest { - + private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; private static final String PNF_CORRELATION_ID = "corrTestId"; private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; - private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\"," - + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]"; - - private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]"; + private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = + {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}", + "{\"correlationId\": \"corr\",\"value\":\"value2\"}"}; - private static final String HOST = "hostTest"; - private static final int PORT = 1234; - private static final String PROTOCOL = "http"; - private static final String URI_PATH_PREFIX = "eventsForTesting"; + private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}"; private static final String TOPIC_NAME = "unauthenticated.PNF_READY"; private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE"; private static final String CONSUMER_ID = "so-bpmn-infra-pnfready"; @@ -79,26 +69,23 @@ public class PnfEventReadyDmaapClientTest { private PnfEventReadyDmaapClient testedObject; private DmaapTopicListenerThread testedObjectInnerClassThread; - private HttpClient httpClientMock; + private KafkaConsumerImpl kafkaConsumerMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledThreadPoolExecutor executorMock; @Before - public void init() throws NoSuchFieldException, IllegalAccessException { - when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT); - when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST); - when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL); - when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX); - when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); - when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID); - when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP); + public void init() throws NoSuchFieldException, IllegalAccessException, IOException { + when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS); + when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); + when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); + when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); testedObject = new PnfEventReadyDmaapClient(env); testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); - httpClientMock = mock(HttpClient.class); + kafkaConsumerMock = mock(KafkaConsumerImpl.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledThreadPoolExecutor.class); setPrivateField(); @@ -116,17 +103,11 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class); - verify(httpClientMock).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); } @@ -134,20 +115,12 @@ public class PnfEventReadyDmaapClientTest { @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException { - ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class); - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse_forReady( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Collections.emptyList()) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - verify(httpClientMock, times(2)).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); @@ -164,8 +137,9 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList( + String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } @@ -180,17 +154,22 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { - Field httpClientField = testedObject.getClass().getDeclaredField("httpClient"); - httpClientField.setAccessible(true); - httpClientField.set(testedObject, httpClientMock); - httpClientField.setAccessible(false); + Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady"); + consumerForPnfReadyField.setAccessible(true); + consumerForPnfReadyField.set(testedObject, kafkaConsumerMock); + consumerForPnfReadyField.setAccessible(false); + + Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate"); + consumerForPnfUpdateField.setAccessible(true); + consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock); + consumerForPnfUpdateField.setAccessible(false); Field executorField = testedObject.getClass().getDeclaredField("executor"); executorField.setAccessible(true); @@ -209,22 +188,4 @@ public class PnfEventReadyDmaapClientTest { threadRunFlag.setAccessible(false); } - private HttpResponse createResponse(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(200); - return response; - } - - private HttpResponse createResponse_forReady(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(500); - return response; - } - } |