aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--prh-app-server/config/application.yaml2
-rw-r--r--prh-app-server/pom.xml8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java20
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java6
-rw-r--r--prh-app-server/src/main/resources/application.properties1
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java71
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java10
-rw-r--r--prh-dmaap-client/pom.xml10
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java38
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java156
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java96
13 files changed, 248 insertions, 182 deletions
diff --git a/prh-app-server/config/application.yaml b/prh-app-server/config/application.yaml
index 306c94f7..4c8c3ef1 100644
--- a/prh-app-server/config/application.yaml
+++ b/prh-app-server/config/application.yaml
@@ -1,6 +1,8 @@
spring:
profiles:
active: prod
+ main:
+ web-application-type: none
server:
port: 8433
ssl:
diff --git a/prh-app-server/pom.xml b/prh-app-server/pom.xml
index dbe3d1b7..ed24fdeb 100644
--- a/prh-app-server/pom.xml
+++ b/prh-app-server/pom.xml
@@ -275,14 +275,6 @@
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.onap.dcaegen2.services.prh</groupId>
- <artifactId>prh-aai-client</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </dependency>
-
-
</dependencies>
<dependencyManagement>
<dependencies>
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 22acf547..1d215c62 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
/**
@@ -46,21 +47,20 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Mono<Optional<ConsumerDmaapModel>> getJsonObject(Mono<Optional<String>> monoMessage) {
+ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage.flatMap(message ->
{
- if (message.isPresent()) {
- JsonElement jsonElement = new JsonParser().parse(message.orElse(""));
- Optional<ConsumerDmaapModel> consumerDmaapModel;
+ if (!StringUtils.isEmpty(message)) {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel;
try {
if (jsonElement.isJsonObject()) {
- consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
+ consumerDmaapModel = create(jsonElement.getAsJsonObject());
} else {
- consumerDmaapModel = Optional
- .of(create(
- StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
+ consumerDmaapModel = create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
}
logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
return Mono.just(consumerDmaapModel);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 753d1f9c..5cd30f8b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -30,11 +30,11 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
abstract DmaapConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
- protected abstract Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 3181c069..08008f0a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -40,7 +40,6 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -57,17 +56,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
-
@Override
- Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
+ Mono<ConsumerDmaapModel> consume(Mono<String> message) {
logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
- public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
+ public Mono<ConsumerDmaapModel> execute(String object) {
dmaapConsumerReactiveHttpClient = resolveClient();
-// dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6fa986e4..e161e3c5 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -77,7 +77,7 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<Optional<ConsumerDmaapModel>>> consumeFromDMaaPMessage() {
+ private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -85,10 +85,10 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<Optional<ConsumerDmaapModel>> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
return monoDMaaPModel.flatMap(dmaapModel -> {
try {
- return Mono.just(aaiProducerTask.execute(dmaapModel.get()));
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
} catch (PrhTaskException e) {
logger.warn("Exception in A&AIProducer task ", e);
return Mono.error(e);
diff --git a/prh-app-server/src/main/resources/application.properties b/prh-app-server/src/main/resources/application.properties
index 53fa9cde..2593b3d1 100644
--- a/prh-app-server/src/main/resources/application.properties
+++ b/prh-app-server/src/main/resources/application.properties
@@ -1,4 +1,5 @@
spring.profiles.active=prod
+spring.main.web-application-type=none
server.port=8433
server.ssl.key-store-type=PKCS12
server.ssl.key-store-password=nokiapnf
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
index f24ef413..9e7edc4d 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
@@ -38,34 +38,6 @@ import reactor.test.StepVerifier;
*/
class DmaapConsumerJsonParserTest {
- private String incorrectMessage =
- "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\""
- + ",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},"
- + "\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\""
- + ":\"Normal\",\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\","
- + "\"sourceName\":\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3}}}]";
-
- private String jsonWithoutPnfVendorAndSerialNumber =
- "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":"
- + "\"<<SerialNumber>>-reg\",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\""
- + "internalHeaderFields\":{},\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\""
- + ":\"5DU\",\"priority\":\"Normal\",reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\""
- + "<<SerialNumber>>\",\"sourceName\":\"5GRAN_DU\",startEpochMicrosec\":1519837825682,\"version\":3},"
- + "\"otherFields\":{\"otherFieldsVersion\":1,\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,"
- + "\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":\"AJ02\",\"pnfOamIpv4Address\":"
- + "\"10.16.123.234\",\"pnfOamIpv6Address\":\"0:0:0:0:0:FFFF:0A10:7BEA\",\"pnfSoftwareVersion\":\"v4.5.0.1\","
- + "\"pnfType\":\"AirScale\"}}}]";
-
- private String jsonWithoutIPInformation =
- "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\":"
- + "\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},\"lastEpochMicrosec\""
- + ":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\":\"Normal\","
- + "\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\",\"sourceName\":"
- + "\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3},\"otherFields\":{\"otherFieldsVersion\":1,"
- + "\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":"
- + "\"AJ02\",\"pnfSerialNumber\":\"QTFCOC540002E\",\"pnfSoftwareVersion\":\"v4.5.0.1\",\"pnfType\":\"AirScale\","
- + "\"pnfVendorName\":\"Nokia\"}}}]";
-
@Test
void whenPassingCorrectJson_validationNotThrowingAnException() {
//given
@@ -99,7 +71,7 @@ class DmaapConsumerJsonParserTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
- .getJsonObject(Mono.just(Optional.of(message))).block().get();
+ .getJsonObject(Mono.just((message))).block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -138,8 +110,8 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
- .block().get();
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
+ .block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -176,8 +148,8 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
- .block().get();
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
+ .block();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -207,7 +179,7 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message))))
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
.expectSubscription().expectError(DmaapNotFoundException.class);
}
@@ -223,7 +195,13 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(incorrectMessage))))
+ String incorrectMessage =
+ "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\""
+ + ",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},"
+ + "\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\""
+ + ":\"Normal\",\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\","
+ + "\"sourceName\":\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3}}}]";
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
.expectSubscription().expectError(DmaapNotFoundException.class);
}
@@ -242,8 +220,18 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
+ String jsonWithoutPnfVendorAndSerialNumber =
+ "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":"
+ + "\"<<SerialNumber>>-reg\",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\""
+ + "internalHeaderFields\":{},\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\""
+ + ":\"5DU\",\"priority\":\"Normal\",reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\""
+ + "<<SerialNumber>>\",\"sourceName\":\"5GRAN_DU\",startEpochMicrosec\":1519837825682,\"version\":3},"
+ + "\"otherFields\":{\"otherFieldsVersion\":1,\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,"
+ + "\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":\"AJ02\",\"pnfOamIpv4Address\":"
+ + "\"10.16.123.234\",\"pnfOamIpv6Address\":\"0:0:0:0:0:FFFF:0A10:7BEA\",\"pnfSoftwareVersion\":\"v4.5.0.1\","
+ + "\"pnfType\":\"AirScale\"}}}]";
StepVerifier
- .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutPnfVendorAndSerialNumber))))
+ .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
.expectSubscription().expectError(DmaapNotFoundException.class);
}
@@ -262,7 +250,16 @@ class DmaapConsumerJsonParserTest {
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutIPInformation))))
+ String jsonWithoutIPInformation =
+ "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\":"
+ + "\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},\"lastEpochMicrosec\""
+ + ":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\":\"Normal\","
+ + "\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\",\"sourceName\":"
+ + "\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3},\"otherFields\":{\"otherFieldsVersion\":1,"
+ + "\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":"
+ + "\"AJ02\",\"pnfSerialNumber\":\"QTFCOC540002E\",\"pnfSoftwareVersion\":\"v4.5.0.1\",\"pnfType\":\"AirScale\","
+ + "\"pnfVendorName\":\"Nokia\"}}}]";
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
.expectSubscription().expectError(DmaapNotFoundException.class);
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
index f9d7c7f1..71e132c4 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
@@ -103,22 +103,20 @@ class DmaapConsumerTaskImplTest {
.expectError(DmaapEmptyResponseException.class);
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
}
@Test
public void whenPassedObjectFits_ReturnsCorrectResponse() throws PrhTaskException {
//given
prepareMocksForDmaapConsumer(Optional.of(message));
- //when
- Mono<Optional<ConsumerDmaapModel>> response = dmaapConsumerTask.execute("Sample input");
+ //when
+ Mono<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
//then
verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
Assertions.assertNotNull(response);
- Assertions.assertEquals(consumerDmaapModel, response.block().get());
+ Assertions.assertEquals(consumerDmaapModel, response.block());
}
@@ -128,7 +126,7 @@ class DmaapConsumerTaskImplTest {
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message));
+ when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerJsonParser));
when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
diff --git a/prh-dmaap-client/pom.xml b/prh-dmaap-client/pom.xml
index 9234518d..0633b46a 100644
--- a/prh-dmaap-client/pom.xml
+++ b/prh-dmaap-client/pom.xml
@@ -54,6 +54,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-reactor-netty</artifactId>
+ <version>2.0.4.RELEASE</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
@@ -109,5 +114,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
index a99833dc..cb7d5af2 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
@@ -17,14 +17,12 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
package org.onap.dcaegen2.services.prh.service.consumer;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Optional;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.slf4j.Logger;
@@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient {
private final String dmaapTopicName;
private final String consumerGroup;
private final String consumerId;
+ private final String dmaapContentType;
+ private final String dmaapUserName;
+ private final String dmaapUserPassword;
public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
this.dmaapHostName = consumerConfiguration.dmaapHostName();
@@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient {
this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
this.consumerGroup = consumerConfiguration.consumerGroup();
this.consumerId = consumerConfiguration.consumerId();
- String dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapContentType = consumerConfiguration.dmaapContentType();
+ this.dmaapUserName = consumerConfiguration.dmaapUserName();
+ this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword();
+ }
+
+ public void initWebClient() {
this.webClient = WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
- .filter(
- basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+ .filter(basicAuthentication(dmaapUserName, dmaapUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
}
- public Mono<Optional<String>> getDmaaPConsumerResponse() {
+ public Mono<String> getDmaaPConsumerResponse() {
try {
return webClient
.get()
@@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient {
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
Mono.error(new Exception("HTTP 500")))
- .bodyToMono(String.class)
- .map(Optional::of);
+ .bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while executing HTTP request: ", e);
return Mono.error(e);
}
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(createRequestPath()).build();
- }
-
private String createRequestPath() {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
- private ExchangeFilterFunction logResponse() {
+ void initWebClient(WebClient webClient) {
+ this.webClient = webClient;
+ }
+
+ ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
logger.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
- private ExchangeFilterFunction logRequest() {
+ URI getUri() throws URISyntaxException {
+ return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+ .setPath(createRequestPath()).build();
+ }
+
+ ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java
new file mode 100644
index 00000000..63966602
--- /dev/null
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java
@@ -0,0 +1,156 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
+ */
+public class DmaapConsumerReactiveHttpClientTest {
+
+ private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+
+ private static DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
+ private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
+ private static Mono<String> expectedResult = Mono.empty();
+ private static WebClient webClient = mock(WebClient.class);
+ private static RequestHeadersUriSpec requestHeadersSpec;
+ private static ResponseSpec responseSpec;
+
+
+ @BeforeAll
+ public static void setUp() {
+ when(consumerConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
+ when(consumerConfigurationMock.dmaapProtocol()).thenReturn("https");
+ when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
+ when(consumerConfigurationMock.dmaapUserName()).thenReturn("PRH");
+ when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
+ when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
+ when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
+ when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
+ when(consumerConfigurationMock.consumerId()).thenReturn("c12");
+
+ dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
+ webClient = spy(WebClient.builder()
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
+ .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
+ consumerConfigurationMock.dmaapUserPassword()))
+ .filter(dmaapConsumerReactiveHttpClient.logRequest())
+ .filter(dmaapConsumerReactiveHttpClient.logResponse())
+ .build());
+ requestHeadersSpec = mock(RequestHeadersUriSpec.class);
+ responseSpec = mock(ResponseSpec.class);
+ }
+
+
+ @Test
+ public void getHttpResponse_Success() {
+ //given
+ expectedResult = Mono.just(JSON_MESSAGE);
+
+ //when
+ mockDependantObjects();
+ doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+ Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse();
+
+ //then
+ StepVerifier.create(response).expectSubscription()
+ .expectNextMatches(results -> {
+ Assertions.assertEquals(results, expectedResult.block());
+ return true;
+ }).verifyComplete();
+ }
+
+
+ @Test
+ public void getHttpResponse_HttpResponse4xxClientError() {
+
+ //when
+ mockDependantObjects();
+ doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
+ .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
+ dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+
+ }
+
+ @Test
+ public void getHttpResponse_HttpResponse5xxClientError() {
+
+ //when
+ mockDependantObjects();
+ doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
+ .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
+ dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+ }
+
+ @Test
+ public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
+ //given
+ dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
+ //when
+ when(webClient.get()).thenReturn(requestHeadersSpec);
+ dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+ when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
+
+ //then
+ StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+ .expectError(Exception.class);
+ }
+
+ private void mockDependantObjects() {
+ when(webClient.get()).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
+ doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+ }
+
+} \ No newline at end of file
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java
deleted file mode 100644
index d2c0e77b..00000000
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-///*-
-// * ============LICENSE_START=======================================================
-// * PNF-REGISTRATION-HANDLER
-// * ================================================================================
-// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
-// * ================================================================================
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// * ============LICENSE_END=========================================================
-// */
-//
-//package org.onap.dcaegen2.services.prh.service.consumer;
-//
-//import org.apache.http.client.ResponseHandler;
-//import org.apache.http.client.methods.HttpGet;
-//import org.apache.http.impl.client.CloseableHttpClient;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeAll;
-//import org.junit.jupiter.api.Test;
-//import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-//
-//import java.io.IOException;
-//import java.lang.reflect.Field;
-//import java.util.Optional;
-//
-//import static org.mockito.ArgumentMatchers.any;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//
-//public class ExtendedDmaapConsumerHttpClientImplTest {
-//
-// private static ExtendedDmaapConsumerHttpClientImpl objectUnderTest;
-//
-// private static DmaapConsumerConfiguration configurationMock = mock(DmaapConsumerConfiguration.class);
-// private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
-//
-// private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\" }";
-//
-// private static Optional<String> expectedResult = Optional.empty();
-//
-// @BeforeAll
-// public static void init() throws NoSuchFieldException, IllegalAccessException {
-//
-// when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-// when(configurationMock.dmaapProtocol()).thenReturn("https");
-// when(configurationMock.dmaapPortNumber()).thenReturn(1234);
-// when(configurationMock.dmaapUserName()).thenReturn("PRH");
-// when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
-// when(configurationMock.dmaapContentType()).thenReturn("application/json");
-// when(configurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
-// when(configurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
-// when(configurationMock.consumerId()).thenReturn("c12");
-//
-// objectUnderTest = new ExtendedDmaapConsumerHttpClientImpl(configurationMock);
-//
-// setField();
-// }
-//
-//
-// @Test
-// public void getHttpResponseGet_success() throws IOException {
-// expectedResult = Optional.of(JSON_MESSAGE);
-//
-// when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class)))
-// .thenReturn(expectedResult);
-//
-// Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-//
-// Assertions.assertEquals(expectedResult.get(), actualResult.get());
-// }
-//
-// @Test
-// public void getExtendedDetails_returnsNull() throws IOException {
-// when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class))).
-// thenReturn(Optional.empty());
-// Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-// Assertions.assertEquals(Optional.empty(),actualResult);
-// }
-//
-//
-// private static void setField() throws NoSuchFieldException, IllegalAccessException {
-// Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
-// field.setAccessible(true);
-// field.set(objectUnderTest, closeableHttpClientMock);
-// }
-//}