From 61b3ff91485571c24834b31c6ee7efc7ab1d0243 Mon Sep 17 00:00:00 2001 From: Rupali Shirode Date: Thu, 7 Dec 2023 16:39:19 +0530 Subject: [SO] Remove DMaap Dependency in SO-bpmn-infra Remove DMaap Dependency in SO-bpmn-infra Issue-ID: SO-4122 Change-Id: I8fbe5761430c21b3f49b31a45ede095fdb72628f Signed-off-by: Rupali Shirode --- .../pnf/dmaap/JsonUtilForPnfCorrelationId.java | 16 ++- .../pnf/dmaap/PnfEventReadyDmaapClient.java | 74 ++++++------- .../pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java | 32 +++--- .../pnf/dmaap/PnfEventReadyDmaapClientTest.java | 121 +++++++-------------- common/pom.xml | 17 +++ .../java/org/onap/so/client/kafka/KafkaClient.java | 21 ++++ .../onap/so/client/kafka/KafkaConsumerImpl.java | 104 ++++++++++++++++++ .../resources/kafka/default-consumer.properties | 6 + .../so/client/kafka/KafkaConsumerImplTest.java | 51 +++++++++ 9 files changed, 300 insertions(+), 142 deletions(-) create mode 100644 common/src/main/java/org/onap/so/client/kafka/KafkaClient.java create mode 100644 common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java create mode 100644 common/src/main/resources/kafka/default-consumer.properties create mode 100644 common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java index a932e4ac50..9cb566f49b 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java @@ -41,18 +41,16 @@ public final class JsonUtilForPnfCorrelationId { throw new IllegalStateException("Utility class"); } - static List parseJsonToGelAllPnfCorrelationId(String json) { - if (json == null || json.isEmpty()) { + static List parseJsonToGelAllPnfCorrelationId(List list) { + if (list == null || list.isEmpty()) { return Collections.emptyList(); } - JsonElement je = new JsonParser().parse(json); - JsonArray array = je.getAsJsonArray(); - List list = new ArrayList<>(); - Spliterator spliterator = array.spliterator(); - spliterator.forEachRemaining(jsonElement -> handleEscapedCharacters(jsonElement) + + List newList = new ArrayList<>(); + list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) - .ifPresent(pnfCorrelationId -> list.add(pnfCorrelationId)))); - return list; + .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); + return newList; } private static Optional handleEscapedCharacters(JsonElement jsonElement) { diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index f215d496cf..44b16dad28 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -28,12 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.ws.rs.core.UriBuilder; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; +import org.onap.so.client.kafka.KafkaConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,38 +38,42 @@ import org.springframework.stereotype.Component; @Component public class PnfEventReadyDmaapClient implements DmaapClient { private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); - private HttpClient httpClient; private Map pnfCorrelationIdToThreadMap; - private HttpGet getRequestForpnfReady; - private HttpGet getRequestForPnfUpdate; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; private volatile boolean dmaapThreadListenerIsRunning; + private KafkaConsumerImpl consumerForPnfReady; + private KafkaConsumerImpl consumerForPnfUpdate; + private String pnfReadyTopic; + private String pnfUpdateTopic; + private String consumerGroup; + private String consumerId; + private String consumerIdUpdate; @Autowired - public PnfEventReadyDmaapClient(Environment env) { - httpClient = HttpClientBuilder.create().build(); + public PnfEventReadyDmaapClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); executor = null; - getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) - .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) - .port(env.getProperty("pnf.dmaap.port", Integer.class)) - .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup")) - .path(env.getProperty("pnf.dmaap.consumerId")).build()); - getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix")) - .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host")) - .port(env.getProperty("pnf.dmaap.port", Integer.class)) - .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup")) - .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build()); + try { + consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + } catch (Exception e) { + throw new RuntimeException(e); + } + pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); + pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); + consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); + consumerId = env.getProperty("pnf.kafka.consumerId"); + consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); } @Override public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); + logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); @@ -83,9 +82,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Override public synchronized Runnable unregister(String pnfCorrelationId) { - logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); + logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); if (pnfCorrelationIdToThreadMap.isEmpty()) { + consumerForPnfUpdate.close(); + consumerForPnfReady.close(); stopDmaapThreadListener(); } return runnable; @@ -114,30 +115,25 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Override public void run() { try { - HttpResponse response; - response = httpClient.execute(getRequestForPnfUpdate); - List pnfUpdateResponse = getPnfCorrelationIdListFromResponse(response); - if (pnfUpdateResponse.isEmpty()) { - response = httpClient.execute(getRequestForpnfReady); + List response; + System.out.println(pnfUpdateTopic + " " + consumerGroup); + response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); + if (response.isEmpty()) { + response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); getPnfCorrelationIdListFromResponse(response) .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); } else { - pnfUpdateResponse.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); } } catch (IOException e) { - logger.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } finally { - getRequestForpnfReady.reset(); - getRequestForPnfUpdate.reset(); + logger.error("Exception caught during sending rest request to kafka for listening event topic", e); } } - private List getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString); - } + private List getPnfCorrelationIdListFromResponse(List response) throws IOException { + if (response != null) { + return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); } return Collections.emptyList(); } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java index 4edee24531..f9e4cb4c88 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java @@ -23,43 +23,47 @@ package org.onap.so.bpmn.infrastructure.pnf.dmaap; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; import java.util.List; import org.junit.Test; public class JsonUtilForPnfCorrelationIdTest { + private static final List LIST_EXAMPLE_WITH_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List LIST_WITH_ONE_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = new ArrayList<>(); + private static final List LIST_WITH_NO_PNF_CORRELATION_ID = new ArrayList<>(); - private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"corrTest1\"," - + "\"key1\":\"value1\"},{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}]"; - - private static final String JSON_WITH_ONE_PNF_CORRELATION_ID = "[{\"correlationId\":\"corrTest3\"}]"; - - private static final String JSON_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = - "[\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\", \"{\\\"correlationId\\\":\\\"corrTest5\\\"}\"]"; - - private static final String JSON_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]"; + static { + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest1\",\"key1\":\"value1\"}"); + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}"); + LIST_WITH_ONE_PNF_CORRELATION_ID.add("{\"correlationId\":\"corrTest3\"}"); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\""); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest5\\\"}\""); + LIST_WITH_NO_PNF_CORRELATION_ID.add("{\"key1\":\"value1\"}"); + } @Test public void parseJsonSuccessful() { List expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_EXAMPLE_WITH_PNF_CORRELATION_ID); assertThat(expectedResult).containsExactly("corrTest1", "corrTest2"); List expectedResult2 = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_WITH_ONE_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_ONE_PNF_CORRELATION_ID); assertThat(expectedResult2).containsExactly("corrTest3"); } @Test public void parseJsonWithEscapeCharacters_Successful() { List expectedResult = JsonUtilForPnfCorrelationId - .parseJsonToGelAllPnfCorrelationId(JSON_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); + .parseJsonToGelAllPnfCorrelationId(LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); assertThat(expectedResult).containsExactly("corrTest4", "corrTest5"); } @Test public void parseJson_emptyListReturnedWhenNothingFound() { List expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(JSON_WITH_NO_PNF_CORRELATION_ID); + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_NO_PNF_CORRELATION_ID); assertThat(expectedResult).isEmpty(); } @@ -70,6 +74,6 @@ public class JsonUtilForPnfCorrelationIdTest { @Test public void shouldReturnEmptyListWhenInputIsEmpty() { - assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId("")).isEmpty(); + assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java index bbb6aad49b..546e644fbd 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java @@ -28,45 +28,35 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.entity.InputStreamEntity; -import org.apache.http.message.BasicHttpResponse; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; +import org.onap.so.client.kafka.KafkaConsumerImpl; import org.springframework.core.env.Environment; + @RunWith(MockitoJUnitRunner.class) public class PnfEventReadyDmaapClientTest { - + private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; private static final String PNF_CORRELATION_ID = "corrTestId"; private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; - private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\"," - + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]"; - - private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]"; + private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = + {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}", + "{\"correlationId\": \"corr\",\"value\":\"value2\"}"}; - private static final String HOST = "hostTest"; - private static final int PORT = 1234; - private static final String PROTOCOL = "http"; - private static final String URI_PATH_PREFIX = "eventsForTesting"; + private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}"; private static final String TOPIC_NAME = "unauthenticated.PNF_READY"; private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE"; private static final String CONSUMER_ID = "so-bpmn-infra-pnfready"; @@ -79,26 +69,23 @@ public class PnfEventReadyDmaapClientTest { private PnfEventReadyDmaapClient testedObject; private DmaapTopicListenerThread testedObjectInnerClassThread; - private HttpClient httpClientMock; + private KafkaConsumerImpl kafkaConsumerMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledThreadPoolExecutor executorMock; @Before - public void init() throws NoSuchFieldException, IllegalAccessException { - when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT); - when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST); - when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL); - when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX); - when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); - when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID); - when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP); + public void init() throws NoSuchFieldException, IllegalAccessException, IOException { + when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS); + when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); + when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); + when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); testedObject = new PnfEventReadyDmaapClient(env); testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); - httpClientMock = mock(HttpClient.class); + kafkaConsumerMock = mock(KafkaConsumerImpl.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledThreadPoolExecutor.class); setPrivateField(); @@ -116,17 +103,11 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - ArgumentCaptor captor1 = ArgumentCaptor.forClass(HttpGet.class); - verify(httpClientMock).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); } @@ -134,20 +115,12 @@ public class PnfEventReadyDmaapClientTest { @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException { - ArgumentCaptor captor1 = ArgumentCaptor.forClass(HttpGet.class); - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse_forReady( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Collections.emptyList()) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - verify(httpClientMock, times(2)).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); @@ -164,8 +137,9 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList( + String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } @@ -180,17 +154,22 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { - Field httpClientField = testedObject.getClass().getDeclaredField("httpClient"); - httpClientField.setAccessible(true); - httpClientField.set(testedObject, httpClientMock); - httpClientField.setAccessible(false); + Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady"); + consumerForPnfReadyField.setAccessible(true); + consumerForPnfReadyField.set(testedObject, kafkaConsumerMock); + consumerForPnfReadyField.setAccessible(false); + + Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate"); + consumerForPnfUpdateField.setAccessible(true); + consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock); + consumerForPnfUpdateField.setAccessible(false); Field executorField = testedObject.getClass().getDeclaredField("executor"); executorField.setAccessible(true); @@ -209,22 +188,4 @@ public class PnfEventReadyDmaapClientTest { threadRunFlag.setAccessible(false); } - private HttpResponse createResponse(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(200); - return response; - } - - private HttpResponse createResponse_forReady(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(500); - return response; - } - } diff --git a/common/pom.xml b/common/pom.xml index 9713d006e1..847c9464a0 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -308,6 +308,23 @@ org.springframework.cloud spring-cloud-starter-sleuth + + + org.apache.kafka + kafka-clients + 3.3.1 + + + uk.org.webcompere + system-stubs-jupiter + 1.1.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.5.2 + diff --git a/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java b/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java new file mode 100644 index 0000000000..2c695255e0 --- /dev/null +++ b/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java @@ -0,0 +1,21 @@ +package org.onap.so.client.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import java.io.IOException; +import java.util.Properties; + +public class KafkaClient { + protected static Logger logger = LoggerFactory.getLogger(KafkaClient.class); + protected final Properties properties; + + public KafkaClient(String filepath) throws IOException { + Resource resource = new ClassPathResource(filepath); + this.properties = new Properties(); + properties.load(resource.getInputStream()); + + } + +} diff --git a/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java b/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java new file mode 100644 index 0000000000..69dd16acf8 --- /dev/null +++ b/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.client.kafka; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class KafkaConsumerImpl extends KafkaClient { + + protected static Logger logger = LoggerFactory.getLogger(KafkaConsumerImpl.class); + private static final String kafkaBootstrapServers = "kafkaBootstrapServers"; + private Consumer consumer; + + public KafkaConsumerImpl(String bootstrapServers) throws Exception { + super("kafka/default-consumer.properties"); + setProperties(bootstrapServers); + } + + + public List get(String topic, String consumerGroup, String consumerId) { + logger.info("consuming message from kafka topic : " + topic); + this.properties.put("group.id", consumerGroup); + this.properties.put("client.id", consumerId); + if (consumer == null) { + consumer = getKafkaConsumer(properties); + consumer.subscribe(Arrays.asList(topic)); + } + ArrayList msgs = new ArrayList<>(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord rec : records) { + msgs.add(rec.value()); + } + logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<< logger.info("MESSAGE CONSUMED FROM KAFKA : <<<<<" + msg + ">>>>>")); + return msgs; + } + + private void setProperties(String bootstrapServers) throws Exception { + if (bootstrapServers == null) { + logger.error("Environment Variable " + kafkaBootstrapServers + " is missing"); + throw new Exception("Environment Variable " + kafkaBootstrapServers + " is missing"); + } else { + this.properties.put("bootstrap.servers", bootstrapServers); + } + + if (System.getenv("JAAS_CONFIG") == null) { + logger.info("Not using any authentication for kafka interaction"); + } else { + logger.info("Using {} authentication provided for kafka interaction", + ScramMechanism.SCRAM_SHA_512.mechanismName()); + this.properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + this.properties.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + this.properties.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + + public static KafkaConsumer getKafkaConsumer(Properties properties) { + return new KafkaConsumer<>(properties); + } + + public void setConsumer(Consumer kafkaConsumer) { + this.consumer = kafkaConsumer; + } + + public void close() { + if (consumer != null) { + logger.info("Closing the Kafka Consumer"); + consumer.close(); + consumer = null; + } + } + +} diff --git a/common/src/main/resources/kafka/default-consumer.properties b/common/src/main/resources/kafka/default-consumer.properties new file mode 100644 index 0000000000..a7edf58b6b --- /dev/null +++ b/common/src/main/resources/kafka/default-consumer.properties @@ -0,0 +1,6 @@ +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +max.poll.interval.ms=300000 +heartbeat.interval.ms=60000 +session.timeout.ms=240000 +max.poll.records=1000 diff --git a/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java b/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java new file mode 100644 index 0000000000..d71e562b64 --- /dev/null +++ b/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java @@ -0,0 +1,51 @@ +package org.onap.so.client.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(SystemStubsExtension.class) +public class KafkaConsumerImplTest { + private KafkaConsumerImpl consumer; + private static MockConsumer mockConsumer; + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @Before + public void setup() { + environmentVariables.set("JAAS_CONFIG", "jaas.config"); + mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + configureMockConsumer(); + } + + @Test + public void consumerShouldConsumeMessages() throws Exception { + consumer = new KafkaConsumerImpl("localhost:9092"); + consumer.setConsumer(mockConsumer); + List response = consumer.get("TOPIC", "CG1", "C1"); + assertThat(response).contains("I", "like", "pizza"); + } + + private void configureMockConsumer() { + mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0))); + + HashMap beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L); + mockConsumer.updateBeginningOffsets(beginningOffsets); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 0L, "key", "I")); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 1L, "key", "like")); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 2L, "key", "pizza")); + + } +} -- cgit 1.2.3-korg