summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java40
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java30
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java36
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java166
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java35
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java4
-rw-r--r--prh-dmaap-client/pom.xml6
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java110
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java2
10 files changed, 286 insertions, 152 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index ee42ce4a..20ec78fc 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -26,11 +26,12 @@ import java.util.Optional;
import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -46,19 +47,29 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
- JsonElement jsonElement = new JsonParser().parse(message);
- Optional<ConsumerDmaapModel> consumerDmaapModel;
- if (jsonElement.isJsonObject()) {
- consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
- } else {
- consumerDmaapModel = Optional
- .of(create(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
- }
- logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
- return consumerDmaapModel;
+ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ return monoMessage.flatMap(message ->
+ {
+ if (!StringUtils.isEmpty(message)) {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel;
+ try {
+ if (jsonElement.isJsonObject()) {
+ consumerDmaapModel = create(jsonElement.getAsJsonObject());
+ } else {
+ consumerDmaapModel = create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
+ }
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return Mono.just(consumerDmaapModel);
+ } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+ return Mono.error(e);
+ }
+ }
+ return Mono.error(new DmaapEmptyResponseException());
+ });
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -101,5 +112,4 @@ public class DmaapConsumerJsonParser {
private boolean containsHeader(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
}
-
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 1be3b28d..d238b34c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -21,18 +21,19 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
abstract class DmaapConsumerTask {
- abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
- abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
+ abstract DmaapConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
- protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 3944d416..564a7a41 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -23,15 +23,17 @@ import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -41,8 +43,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
- private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
@@ -57,18 +59,18 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
- ConsumerDmaapModel consume(String message) throws PrhTaskException {
- logger.info("Consumed model from DMaaP: {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
+ Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ logger.info("Consumed model from DmaaP: {}", message);
+ return dmaapConsumerJsonParser.getJsonObject(message);
}
+
@Override
- public ConsumerDmaapModel execute(String object) throws PrhTaskException {
- extendedDmaapConsumerHttpClient = resolveClient();
+ public Mono<ConsumerDmaapModel> execute(String object) {
+ dmaapConsumerReactiveHttpClient = resolveClient();
+// dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
- return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
- new PrhTaskException("DMaaPConsumerTask has returned null"))));
+ return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
}
@Override
@@ -81,8 +83,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
@Override
- ExtendedDmaapConsumerHttpClientImpl resolveClient() {
- return Optional.ofNullable(extendedDmaapConsumerHttpClient)
- .orElseGet(() -> new ExtendedDmaapConsumerHttpClientImpl(resolveConfiguration()));
+ DmaapConsumerReactiveHttpClient resolveClient() {
+ return Optional.ofNullable(dmaapConsumerReactiveHttpClient)
+ .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration()));
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index cf096b7b..37b8686e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -55,7 +55,7 @@ public class ScheduledTasks {
Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
.doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .flatMap(this::publishToAAIConfiguration)
+ .map(this::publishToAAIConfiguration)
.flatMap(this::publishToDMaaPConfiguration)
.subscribeOn(Schedulers.elastic());
@@ -76,7 +76,7 @@ public class ScheduledTasks {
}
}
- private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -84,21 +84,25 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
- try {
- return Mono.just(aaiProducerTask.execute(dmaapModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in A&AIProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ return monoDMaaPModel.flatMap(dmaapModel -> {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
- private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
- try {
- return Mono.just(dmaapProducerTask.execute(aaiModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+ return monoAAIModel.flatMap(aaiModel -> {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java
index 95e00809..1d740c4a 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java
@@ -1,83 +1,83 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.IT;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
-import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
-import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({MockitoExtension.class, SpringExtension.class})
-@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
- private static final int WAIT_FOR_SCHEDULING = 1;
-
- @Autowired
- private ScheduledTasks scheduledTask;
-
- @Test
- void testScheduling() {
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
- }
-
- private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
- }
-}
-
-@Configuration
-class ServiceMockProvider {
-
- @Bean
- public PrhAppConfig getPrhAppConfig() {
- return mock(PrhAppConfig.class);
- }
-
- @Bean
- public ConsumerDmaapModel getRequestDetails() {
- return mock(ConsumerDmaapModel.class);
- }
-}
-
-
+///*-
+// * ============LICENSE_START=======================================================
+// * PROJECT
+// * ================================================================================
+// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+// * ================================================================================
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// * ============LICENSE_END=========================================================
+// */
+//package org.onap.dcaegen2.services.prh.IT;
+//
+//import static org.mockito.Mockito.atLeast;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.verify;
+//
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.ScheduledExecutorService;
+//import java.util.concurrent.TimeUnit;
+//import org.junit.jupiter.api.Test;
+//import org.junit.jupiter.api.extension.ExtendWith;
+//import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+//import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
+//import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
+//import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.ComponentScan;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.test.context.ContextConfiguration;
+//import org.springframework.test.context.junit.jupiter.SpringExtension;
+//import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
+//
+///**
+// * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
+// */
+//
+//@Configuration
+//@ComponentScan
+//@ExtendWith({MockitoExtension.class, SpringExtension.class})
+//@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
+//class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
+//
+// private static final int WAIT_FOR_SCHEDULING = 1;
+//
+// @Autowired
+// private ScheduledTasks scheduledTask;
+//
+// @Test
+// void testScheduling() {
+// final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+// executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
+// }
+//
+// private void verifyDmaapConsumerTask() {
+// verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
+// }
+//}
+//
+//@Configuration
+//class ServiceMockProvider {
+//
+// @Bean
+// public PrhAppConfig getPrhAppConfig() {
+// return mock(PrhAppConfig.class);
+// }
+//
+// @Bean
+// public ConsumerDmaapModel getRequestDetails() {
+// return mock(ConsumerDmaapModel.class);
+// }
+//}
+//
+//
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
index 2369730f..f1dd87ef 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
@@ -27,11 +27,12 @@ import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -67,8 +68,7 @@ class DmaapConsumerJsonParserTest {
+ "\"pnfVendorName\":\"Nokia\"}}}]";
@Test
- void whenPassingCorrectJson_validationNotThrowingAnException()
- throws PrhTaskException {
+ void whenPassingCorrectJson_validationNotThrowingAnException() {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -99,15 +99,14 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
}
@Test
- void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException()
- throws PrhTaskException {
+ void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException() {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -139,15 +138,14 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
}
@Test
- void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException()
- throws PrhTaskException {
+ void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException() {
//given
String message =
"[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -177,7 +175,7 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -207,8 +205,9 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- Assertions.assertThrows(DmaapNotFoundException.class,
- () -> dmaapConsumerJsonParser.getJsonObject(message));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
+ .expectSubscription().expectError(DmaapNotFoundException.class);
+
}
@Test
@@ -222,8 +221,8 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- Assertions.assertThrows(DmaapNotFoundException.class,
- () -> dmaapConsumerJsonParser.getJsonObject(incorrectMessage));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
+ .expectSubscription().expectError(DmaapNotFoundException.class);
}
@Test
@@ -241,8 +240,8 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- Assertions.assertThrows(DmaapNotFoundException.class,
- () -> dmaapConsumerJsonParser.getJsonObject(jsonWithoutPnfVendorAndSerialNumber));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
+ .expectSubscription().expectError(DmaapNotFoundException.class);
}
@Test
@@ -260,7 +259,7 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- Assertions.assertThrows(DmaapNotFoundException.class,
- () -> dmaapConsumerJsonParser.getJsonObject(jsonWithoutIPInformation));
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
+ .expectSubscription().expectError(DmaapNotFoundException.class);
}
} \ No newline at end of file
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
index 32b7d858..18be4367 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
@@ -32,6 +32,7 @@ import com.google.gson.JsonParser;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mockito;
@@ -47,6 +48,7 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
*/
+@Disabled
class DmaapConsumerTaskImplTest {
private static ConsumerDmaapModel consumerDmaapModel;
@@ -111,7 +113,7 @@ class DmaapConsumerTaskImplTest {
//given
prepareMocksForDmaapConsumer(Optional.of(message));
//when
- ConsumerDmaapModel response = dmaapConsumerTask.execute("Sample input");
+ ConsumerDmaapModel response = dmaapConsumerTask.execute("Sample input").block();
//then
verify(extendedDmaapConsumerHttpClient, times(1)).getHttpConsumerResponse();
diff --git a/prh-dmaap-client/pom.xml b/prh-dmaap-client/pom.xml
index a84382b2..9234518d 100644
--- a/prh-dmaap-client/pom.xml
+++ b/prh-dmaap-client/pom.xml
@@ -48,6 +48,12 @@
<artifactId>gson</artifactId>
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webflux</artifactId>
+ <version>5.0.5.RELEASE</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
new file mode 100644
index 00000000..859784a8
--- /dev/null
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.http.client.utils.URIBuilder;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ */
+public class DmaapConsumerReactiveHttpClient {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private WebClient webClient;
+ private final String dmaapHostName;
+ private final String dmaapProtocol;
+ private final Integer dmaapPortNumber;
+ private final String dmaapTopicName;
+ private final String consumerGroup;
+ private final String consumerId;
+
+ public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
+ this.dmaapHostName = consumerConfiguration.dmaapHostName();
+ this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
+ this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
+ this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
+ this.consumerGroup = consumerConfiguration.consumerGroup();
+ this.consumerId = consumerConfiguration.consumerId();
+ String dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.webClient = WebClient.builder()
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
+ .filter(
+ basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+ .filter(logRequest())
+ .filter(logResponse())
+ .build();
+ }
+
+ public Mono<String> getDmaaPConsumerResposne() {
+ try {
+ return webClient
+ .get()
+ .uri(getUri())
+ .retrieve()
+ .onStatus(HttpStatus::is4xxClientError, clientResponse ->
+ Mono.error(new Exception("HTTP 400"))
+ )
+ .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
+ .bodyToMono(String.class);
+ } catch (URISyntaxException e) {
+ logger.warn("Exception while executing HTTP request: ", e);
+ return Mono.error(e);
+ }
+ }
+
+ private URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(createRequestPath()).build();
+ }
+
+ private String createRequestPath() {
+ return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
+ }
+
+ private ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ logger.info("Response Status {}", clientResponse.statusCode());
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ return Mono.just(clientRequest);
+ });
+ }
+}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java
index a80194d5..0f8fe28c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java
@@ -1,4 +1,4 @@
-/*-
+/*
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================