summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java13
-rw-r--r--prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java13
-rw-r--r--prh-app-server/config/prh_endpoints.json2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java1
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java8
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java6
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java1
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java4
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java6
-rw-r--r--prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java2
13 files changed, 42 insertions, 28 deletions
diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
index 2b156936..d79e245a 100644
--- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
+++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
@@ -41,6 +41,7 @@ public class AaiReactiveWebClient {
/**
* Creating AaiReactiveWebClient.
+ *
* @param configuration - configuration object
* @return AaiReactiveWebClient
*/
@@ -58,18 +59,18 @@ public class AaiReactiveWebClient {
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
- .filter(basicAuthentication(aaiUserName, aaiUserPassword))
- .filter(logRequest())
- .filter(logResponse())
- .build();
+ .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
+ .filter(basicAuthentication(aaiUserName, aaiUserPassword))
+ .filter(logRequest())
+ .filter(logResponse())
+ .build();
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}",name, value)));
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
index b43c2164..ff5d4bb7 100644
--- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
+++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
@@ -27,6 +27,8 @@ import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
@@ -40,6 +42,7 @@ public class AaiProducerReactiveHttpClient {
private final String aaiProtocol;
private final Integer aaiHostPortNumber;
private final String aaiBasePath;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
@@ -61,7 +64,9 @@ public class AaiProducerReactiveHttpClient {
* @return status code of operation
*/
public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
- return consumerDmaapModelMono.flatMap(this::patchAaiRequest);
+ return consumerDmaapModelMono
+ .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel))
+ .flatMap(this::patchAaiRequest);
}
public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) {
@@ -77,10 +82,12 @@ public class AaiProducerReactiveHttpClient {
.retrieve()
.onStatus(
HttpStatus::is4xxClientError,
- clientResponse -> Mono.error(new AaiRequestException("HTTP 400"))
+ clientResponse -> Mono
+ .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError,
- clientResponse -> Mono.error(new AaiRequestException("HTTP 500")))
+ clientResponse -> Mono
+ .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())))
.bodyToMono(Integer.class);
} catch (URISyntaxException e) {
return Mono.error(e);
diff --git a/prh-app-server/config/prh_endpoints.json b/prh-app-server/config/prh_endpoints.json
index 1e57e05d..e2dd51a3 100644
--- a/prh-app-server/config/prh_endpoints.json
+++ b/prh-app-server/config/prh_endpoints.json
@@ -4,7 +4,7 @@
"dmaapConsumerConfiguration": {
"dmaapHostName": "localhost",
"dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.SEC_OTHER_OUTPUT",
+ "dmaapTopicName": "/events/unauthenticated.VES_PNFREG_OUTPUT",
"dmaapProtocol": "http",
"dmaapUserName": "admin",
"dmaapUserPassword": "admin",
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 8782a180..c53d3333 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -44,8 +44,8 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig extends CloudConfiguration {
- private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1;
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+ private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
private final ConcurrentTaskScheduler taskScheduler;
@@ -86,7 +86,8 @@ public class SchedulerConfig extends CloudConfiguration {
.scheduleAtFixedRate(super::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
- .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS));
+ .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask,
+ Duration.ofSeconds(SCHEDULING_DELAY_FOR_PRH_TASKS)));
return true;
} else {
return false;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index d7bbfd2c..53c370f1 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -29,6 +29,8 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
@@ -43,6 +45,8 @@ public class DmaapConsumerJsonParser {
private static final String VENDOR_NAME = "vendorName";
private static final String SERIAL_NUMBER = "serialNumber";
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
@@ -51,6 +55,7 @@ public class DmaapConsumerJsonParser {
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
+ .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index baccd3f3..976547e2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -54,7 +54,7 @@ public class AaiProducerTaskImpl extends
@Override
Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- logger.info("Sending PNF model to AAI {}", consumerDmaapModel);
+
return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
.flatMap(response -> {
if (HttpUtils.isSuccessfulResponseCode(response)) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 180ad456..f8eccf11 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -57,7 +57,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
Mono<ConsumerDmaapModel> consume(Mono<String> message) {
- logger.info("Consumed model from DMaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 8188267e..13f1b162 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -49,9 +49,11 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
@Override
Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
- consumerDmaapModel);
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ return consumerDmaapModel.flatMap(dmaapModel -> {
+ logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
+ dmaapModel);
+ return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
+ });
}
@Override
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
index e0419e8f..662a3947 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
@@ -92,7 +92,7 @@ class DmaapPublisherTaskImplTest {
//then
verify(dMaaPProducerReactiveHttpClient, times(1))
- .getDMaaPProducerResponse(any(Mono.class));
+ .getDMaaPProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
@@ -107,14 +107,14 @@ class DmaapPublisherTaskImplTest {
.expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete();
//then
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any(Mono.class));
+ verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
private void prepareMocksForTests(Integer httpResponseCode) {
dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any(Mono.class)))
+ when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any()))
.thenReturn(Mono.just(httpResponseCode.toString()));
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
index 4317da44..65834b54 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
@@ -62,7 +62,6 @@ public class DMaaPReactiveWebClient {
public WebClient build() {
return WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
- .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
index 46699275..e04c07cf 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
@@ -71,10 +71,10 @@ public class DMaaPConsumerReactiveHttpClient {
.uri(getUri())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
+ Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while evaluating URI ");
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index eb62d3cb..b06ebfdd 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -63,7 +63,7 @@ public class DMaaPProducerReactiveHttpClient {
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
+ public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
try {
return webClient
.post()
@@ -71,10 +71,10 @@ public class DMaaPProducerReactiveHttpClient {
.body(BodyInserters.fromObject(consumerDmaapModelMono))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
+ Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while evaluating URI");
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
index b5e730a0..538385ce 100644
--- a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
+++ b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
@@ -89,7 +89,7 @@ class DMaaPProducerReactiveHttpClientTest {
mockWebClientDependantObject();
doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel));
+ Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
//then
Assertions.assertEquals(response.block(), expectedResult.block());