summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java28
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java81
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java22
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java67
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java31
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java78
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java8
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java57
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java47
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java46
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java12
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java150
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java20
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java26
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java155
23 files changed, 361 insertions, 586 deletions
diff --git a/pom.xml b/pom.xml
index 676b097b..f320360b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
<spring-boot.version>2.1.2.RELEASE</spring-boot.version>
<springfox.version>2.9.2</springfox.version>
<immutables.version>2.7.5</immutables.version>
- <sdk.version>1.1.5-SNAPSHOT</sdk.version>
+ <sdk.version>1.2.0-SNAPSHOT</sdk.version>
</properties>
<modules>
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
index 3ff81e1f..889dae20 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
@@ -25,8 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +50,9 @@ import java.util.Optional;
public class CbsConfiguration extends PrhAppConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class);
private AaiClientConfiguration aaiClientCBSConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherCBSConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCBSConfiguration;
- private DmaapPublisherConfiguration dmaapUpdatePublisherCBSConfiguration;
+ private MessageRouterPublishRequest messageRouterCBSPublishRequest;
+ private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest;
+ private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest;
@Autowired
private ConsulConfigFileReader consulConfigFileReader;
@@ -82,10 +82,10 @@ public class CbsConfiguration extends PrhAppConfig {
private void parseCBSConfig(JsonObject jsonObject) {
LOGGER.info("Received application configuration: {}", jsonObject);
CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject);
- dmaapPublisherCBSConfiguration = consulConfigurationParser.getDmaapPublisherConfig();
- dmaapUpdatePublisherCBSConfiguration = consulConfigurationParser.getDmaapUpdatePublisherConfig();
+ messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
+ messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest();
aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig();
- dmaapConsumerCBSConfiguration = consulConfigurationParser.getDmaapConsumerConfig();
+ messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
}
private void parsingConfigError(Throwable throwable) {
@@ -97,13 +97,13 @@ public class CbsConfiguration extends PrhAppConfig {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return Optional.ofNullable(dmaapPublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ public MessageRouterPublishRequest getMessageRouterPublishRequest() {
+ return Optional.ofNullable(messageRouterCBSPublishRequest).orElse(super.getMessageRouterPublishRequest());
}
@Override
- public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() {
- return Optional.ofNullable(dmaapUpdatePublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+ return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElse(super.getMessageRouterUpdatePublishRequest());
}
@Override
@@ -112,7 +112,7 @@ public class CbsConfiguration extends PrhAppConfig {
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return Optional.ofNullable(dmaapConsumerCBSConfiguration).orElse(super.getDmaapConsumerConfiguration());
+ public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElse(super.getMessageRouterSubscribeRequest());
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
index f19eb3e6..51d86399 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
@@ -30,11 +30,12 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import java.time.Duration;
import java.util.Map;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
@@ -47,7 +48,6 @@ class CbsContentParser {
private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath";
private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath";
private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath";
- private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth";
private static final String CONFIG = "config";
private static final String PNF_UPDATE = "pnf-update";
private static final String PNF_READY = "pnf-ready";
@@ -59,48 +59,24 @@ class CbsContentParser {
this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
+ MessageRouterPublishRequest getMessageRouterPublishRequest() {
RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get();
MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .endpointUrl(parsedSink.topicUrl())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
- .build();
+ return ImmutableMessageRouterPublishRequest.builder()
+ .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+ .sinkDefinition(parsedSink)
+ .build();
}
- DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() {
+ MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get();
MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .endpointUrl(parsedSink.topicUrl())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
- .build();
+ return ImmutableMessageRouterPublishRequest.builder()
+ .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+ .sinkDefinition(parsedSink)
+ .build();
}
AaiClientConfiguration getAaiClientConfig() {
@@ -126,28 +102,15 @@ class CbsContentParser {
.build();
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
+ MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get();
MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .endpointUrl(parsedSource.topicUrl())
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
- .build();
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+ .sourceDefinition(parsedSource)
+ .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+ .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong()))
+ .build();
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
index 6363356f..7b87415b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
@@ -23,6 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.springframework.core.io.Resource;
/**
@@ -32,12 +34,12 @@ public interface Config {
Resource getGitInfo();
- DmaapConsumerConfiguration getDmaapConsumerConfiguration();
+ MessageRouterSubscribeRequest getMessageRouterSubscribeRequest();
AaiClientConfiguration getAaiClientConfiguration();
- DmaapPublisherConfiguration getDmaapPublisherConfiguration();
+ MessageRouterPublishRequest getMessageRouterPublishRequest();
- DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration();
+ MessageRouterPublishRequest getMessageRouterUpdatePublishRequest();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
index 7355cf48..f18f1d90 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
@@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.MessageRouterPublisherResolver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -31,12 +32,12 @@ public class DmaapPublisherTaskConfig {
@Bean(name = "ReadyPublisherTask")
@Autowired
public DmaapPublisherTask getReadyPublisherTask(final Config config) {
- return new DmaapPublisherTaskImpl(config::getDmaapPublisherConfiguration);
+ return new DmaapPublisherTaskImpl(config::getMessageRouterPublishRequest, new MessageRouterPublisherResolver());
}
@Bean(name = "UpdatePublisherTask")
@Autowired
public DmaapPublisherTask getUpdatePublisherTask(final Config config) {
- return new DmaapPublisherTaskImpl(config::getDmaapUpdatePublisherConfiguration);
+ return new DmaapPublisherTaskImpl(config::getMessageRouterUpdatePublishRequest, new MessageRouterPublisherResolver());
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 4b48fa30..01ef2063 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -21,8 +21,8 @@
package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -47,11 +47,11 @@ public abstract class PrhAppConfig implements Config {
AaiClientConfiguration aaiClientConfiguration;
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest;
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ MessageRouterPublishRequest messageRouterPublishRequest;
- DmaapPublisherConfiguration dmaapUpdatePublisherConfiguration;
+ MessageRouterPublishRequest messageRouterUpdatePublishRequest;
@Value("classpath:git_info.json")
private Resource gitInfo;
@@ -67,8 +67,8 @@ public abstract class PrhAppConfig implements Config {
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerConfiguration;
+ public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ return messageRouterSubscribeRequest;
}
@Override
@@ -77,12 +77,12 @@ public abstract class PrhAppConfig implements Config {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public MessageRouterPublishRequest getMessageRouterPublishRequest() {
+ return messageRouterPublishRequest;
}
@Override
- public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() {
- return dmaapUpdatePublisherConfiguration;
+ public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+ return messageRouterUpdatePublishRequest;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 43d6922a..b3d84562 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -20,6 +20,23 @@
package org.onap.dcaegen2.services.prh.service;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
@@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.StringUtils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) {
- return monoMessage
- .flatMapMany(this::getConsumerDmaapModelFromJsonArray);
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) {
+ return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items()));
}
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
- LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement);
-
- if (jsonElement instanceof JsonObject) {
- LOGGER.debug("Element is JsonObject");
- return create(Flux.just((JsonObject) jsonElement));
- }
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) {
+ LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items);
- if (jsonElement instanceof JsonArray) {
- LOGGER.debug("Element is JsonArray");
- JsonArray jsonArray = (JsonArray) jsonElement;
- if (jsonArray.size() == 0) {
- LOGGER.debug("Nothing to consume from DMaaP");
- return Flux.empty();
- }
- return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false)
- .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
- .orElseGet(JsonObject::new)))));
+ if (items.size() == 0) {
+ LOGGER.debug("Nothing to consume from DMaaP");
+ return Flux.empty();
}
-
- LOGGER.warn("Element is neither JSON Object or Array");
- return Flux.empty();
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
index 02691446..0b26890d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
@@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper;
import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper;
import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer;
import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
-import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-
-
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
@@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask {
@Autowired
BbsActionsTaskImpl(Config config) {
- this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext()));
+ this(config, RxHttpClientFactory.createInsecure());
}
BbsActionsTaskImpl(Config config, RxHttpClient httpClient) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 3a630a40..5fc41d93 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -20,20 +20,15 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
import reactor.core.publisher.Flux;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
interface DmaapConsumerTask {
-
Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
-
- DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index d3086cbe..f46e2cc9 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,15 +20,11 @@
package org.onap.dcaegen2.services.prh.tasks;
-import com.google.gson.JsonElement;
-import java.util.Optional;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+
@Autowired
public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+ this(config, new DmaapConsumerJsonParser());
}
- DmaapConsumerTaskImpl(Config prhAppConfig,
- DmaapConsumerJsonParser dmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ public Flux<ConsumerDmaapModel> execute(String object) {
+ MessageRouterSubscriber messageRouterSubscriberClient =
+ new MessageRouterSubscriberResolver().resolveClient();
LOGGER.debug("Method called with arg {}", object);
- Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(
- Optional.empty());
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
+ .get(config.getMessageRouterSubscribeRequest());
return dmaapConsumerJsonParser.getJsonObject(response);
}
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.getDmaapConsumerConfiguration());
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 7fc596c1..f63f4d76 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -20,10 +20,10 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -31,16 +31,8 @@ import reactor.core.publisher.Mono;
*/
public interface DmaapPublisherTask {
- /**
- *
- * Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
- * */
- @Deprecated
- Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
- execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
- Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
- DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
+ Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 55a8bb58..3a724884 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.tasks;
+import com.google.gson.JsonPrimitive;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import javax.net.ssl.SSLException;
-import java.util.Optional;
import java.util.function.Supplier;
/**
@@ -46,70 +44,52 @@ import java.util.function.Supplier;
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Supplier<DmaapPublisherConfiguration> config;
+
+ private final Supplier<MessageRouterPublishRequest> config;
+ private final MessageRouterPublisherResolver messageRouterPublisherClientResolver;
private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) {
- this(config, new PublisherReactiveHttpClientFactory(
- new DmaaPRestTemplateFactory(),
- new PnfReadyJsonBodyBuilderImpl()));
- }
- DmaapPublisherTaskImpl(
- Supplier<DmaapPublisherConfiguration> config,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) {
this.config = config;
- this.httpClientFactory = httpClientFactory;
+ this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver;
}
@Override
- public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
- execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException {
+ public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
+ MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
- }
-
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.get());
-
+ String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+ return messageRouterPublisher.put(
+ config.get(),
+ Flux.just(json).map(JsonPrimitive::new));
}
/**
*
* Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
* */
@Override
public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
- DefaultHttpClient httpClient = new DefaultHttpClient();
- HttpPost postRequest = new HttpPost(getUrl());
- try {
- StringEntity input = new StringEntity(json);
- input.setContentType(config.get().dmaapContentType());
- postRequest.setEntity(input);
- HttpResponse response = httpClient.execute(postRequest);
- return Mono.just(response);
- } catch (Exception e) {
- LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
- return Mono.error(e);
+ try (DefaultHttpClient httpClient = new DefaultHttpClient()) {
+ HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl());
+ try {
+ StringEntity input = new StringEntity(json);
+ input.setContentType(config.get().contentType());
+ postRequest.setEntity(input);
+ HttpResponse response = httpClient.execute(postRequest);
+ return Mono.just(response);
+ } catch (Exception e) {
+ LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+ return Mono.error(e);
+ }
}
}
- private String getUrl() {
- return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol())
- .host(config.get().dmaapHostName())
- .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build()
- .toString();
- }
- private String createRequestPath() {
- return "/" + config.get().dmaapTopicName();
- }
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
new file mode 100644
index 00000000..2f4e3867
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterPublisherResolver {
+
+ public MessageRouterPublisher resolveClient() {
+ return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
new file mode 100644
index 00000000..63930ef7
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterSubscriberResolver {
+
+ public MessageRouterSubscriber resolveClient() {
+ return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 72ec0cac..4b3436e5 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -50,6 +51,7 @@ public class ScheduledTasks {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapReadyProducerTask;
private final DmaapPublisherTask dmaapUpdateProducerTask;
@@ -208,7 +210,7 @@ public class ScheduledTasks {
* Marked as deprecated due to problems with DMaaP MR, to be fixed in future
*/
@Deprecated
- private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
+ private Flux<MessageRouterPublishResponse>
publishToDmaapConfiguration(final State state) {
try {
if (state.ActivationStatus) {
@@ -217,8 +219,8 @@ public class ScheduledTasks {
}
return dmaapReadyProducerTask.execute(state.DmaapModel);
- } catch (PrhTaskException | SSLException e) {
- return Mono.error(e);
+ } catch (PrhTaskException e) {
+ return Flux.error(e);
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java
index 9dca398a..fb0b1b43 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java
@@ -20,51 +20,38 @@
package org.onap.dcaegen2.services.prh;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+
+import java.time.Duration;
public class TestAppConfiguration {
- public static ImmutableDmaapConsumerConfiguration createDefaultDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT")
+ public static ImmutableMessageRouterSubscribeRequest createDefaultMessageRouterSubscribeRequest() {
+ return ImmutableMessageRouterSubscribeRequest.builder()
.consumerGroup("OpenDCAE-c12")
+ .sourceDefinition(ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234))
+ .build())
.consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/prh/local/org.onap.prh.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.SEC_OTHER_OUTPUT")
- .timeoutMs(-1)
- .messageLimit(-1)
+ .timeout(Duration.ofMillis(1))
.build();
}
- public static ImmutableDmaapPublisherConfiguration createDefaultDmaapPublisherConfiguration() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/prh/local/org.onap.prh.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.PNF_READY")
+ public static ImmutableMessageRouterPublishRequest createDefaultMessageRouterPublishRequest() {
+ return ImmutableMessageRouterPublishRequest.builder()
+ .contentType("application/json")
+ .sinkDefinition(ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234))
+ .build())
.build();
- }
+
+ }
public static ImmutableAaiClientConfiguration createDefaultAaiClientConfiguration() {
return new ImmutableAaiClientConfiguration.Builder()
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java
index 8a2a498f..350cee68 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java
@@ -20,21 +20,22 @@
package org.onap.dcaegen2.services.prh.configuration;
-import static java.lang.ClassLoader.getSystemResource;
-import static org.assertj.core.api.Assertions.assertThat;
-
import com.google.gson.Gson;
import com.google.gson.JsonObject;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.TestAppConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
class ConsulConfigurationParserTest {
@@ -43,10 +44,8 @@ class ConsulConfigurationParserTest {
new String(Files.readAllBytes(Paths.get(getSystemResource("flattened_configuration.json").toURI())));
private final ImmutableAaiClientConfiguration correctAaiClientConfig =
TestAppConfiguration.createDefaultAaiClientConfiguration();
- private final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig =
- TestAppConfiguration.createDefaultDmaapConsumerConfiguration();
- private final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig =
- TestAppConfiguration.createDefaultDmaapPublisherConfiguration();
+ private final ImmutableMessageRouterPublishRequest correctDmaapPublisherConfig =
+ TestAppConfiguration.createDefaultMessageRouterPublishRequest();
private final CbsContentParser consulConfigurationParser = new CbsContentParser(
new Gson().fromJson(correctJson, JsonObject.class));
@@ -63,25 +62,25 @@ class ConsulConfigurationParserTest {
assertThat(aaiClientConfig).isEqualToComparingFieldByField(correctAaiClientConfig);
}
-
@Test
- void shouldCreateDmaapConsumerConfigurationCorrectly() {
- // when
- DmaapConsumerConfiguration dmaapConsumerConfig = consulConfigurationParser.getDmaapConsumerConfig();
+ void shouldCreateMessageRouterSubscribeRequestCorrectly() {
+ // given
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
// then
- assertThat(dmaapConsumerConfig).isNotNull();
- assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig);
+ assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT");
+ assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDCAE-c12");
+ assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("c12");
+ assertThat(messageRouterSubscribeRequest.timeout()).isEqualTo(Duration.ofMillis(-1));
}
-
@Test
- void shouldCreateDmaapPublisherConfigurationCorrectly() {
+ void shouldCreateMessageRouterPublishConfigurationCorrectly() {
// when
- DmaapPublisherConfiguration dmaapPublisherConfig = consulConfigurationParser.getDmaapPublisherConfig();
+ MessageRouterPublishRequest messageRouterPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
// then
- assertThat(dmaapPublisherConfig).isNotNull();
- assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig);
+ assertThat(messageRouterPublishRequest.contentType()).isEqualTo("application/json");
+ assertThat(messageRouterPublishRequest.sinkDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.PNF_READY");
}
} \ No newline at end of file
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
index cdcef07c..98b73142 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
@@ -20,21 +20,24 @@
package org.onap.dcaegen2.services.prh.service;
-import static org.mockito.Mockito.spy;
-
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.util.Optional;
+
+import static org.mockito.Mockito.spy;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -101,6 +104,7 @@ class DmaapConsumerJsonParserTest {
.build();
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -109,7 +113,7 @@ class DmaapConsumerJsonParserTest {
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
- .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+ .getJsonObject(Mono.just((response))).blockFirst();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -163,15 +167,15 @@ class DmaapConsumerJsonParserTest {
.nfRole("gNB")
.swVersion("v4.5.0.1")
.build();
- JsonArray mesageAsJsonArray = (JsonArray) jsonParser.parse(message);
-
+ JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = new JsonParser().parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
- .getJsonObject(Mono.just((mesageAsJsonArray))).blockFirst();
+ .getJsonObject(Mono.just((response))).blockFirst();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -230,6 +234,7 @@ class DmaapConsumerJsonParserTest {
.build();
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -238,7 +243,7 @@ class DmaapConsumerJsonParserTest {
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
- .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+ .getJsonObject(Mono.just((response))).blockFirst();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -293,6 +298,7 @@ class DmaapConsumerJsonParserTest {
.build();
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -301,7 +307,7 @@ class DmaapConsumerJsonParserTest {
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
- .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+ .getJsonObject(Mono.just((response))).blockFirst();
//then
Assertions.assertNotNull(consumerDmaapModel);
Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -349,8 +355,9 @@ class DmaapConsumerJsonParserTest {
+ "}}}]";
JsonArray incorrectMessageAsJsonArray = (JsonArray) jsonParser.parse(incorrectMessage);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(incorrectMessageAsJsonArray).build();
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageAsJsonArray)))
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
.expectSubscription().thenRequest(1).verifyComplete();
}
@@ -394,8 +401,9 @@ class DmaapConsumerJsonParserTest {
+ "}}}]";
JsonArray jsonWithoutSourceNameAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutSourceName);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutSourceNameAsJsonArray).build();
StepVerifier
- .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceNameAsJsonArray)))
+ .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
.expectSubscription().thenRequest(1)
.verifyComplete();
@@ -444,8 +452,9 @@ class DmaapConsumerJsonParserTest {
+ "}}}]";
JsonArray jsonWithoutIpInformationAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutIpInformation);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutIpInformationAsJsonArray).build();
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformationAsJsonArray)))
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
.expectSubscription().thenRequest(1).verifyComplete();
}
@@ -485,15 +494,15 @@ class DmaapConsumerJsonParserTest {
+ "}}}";
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
-
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)));
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)))
+ dmaapConsumerJsonParser.getJsonObject(Mono.just((response)));
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
.blockFirst();
//then
ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder()
@@ -560,13 +569,14 @@ class DmaapConsumerJsonParserTest {
.build();
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)))
+ ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
.blockFirst();
//then
@@ -625,12 +635,12 @@ class DmaapConsumerJsonParserTest {
.build();
JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+ MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
//then
- StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageAsJsonArray)))
- .expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete();
+ StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))).expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete();
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
index 18e1a27a..04388fb7 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
@@ -20,12 +20,6 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
-
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
@@ -34,6 +28,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
/**
@@ -49,7 +47,7 @@ public class AaiPublisherTaskSpy {
*/
@Bean
@Primary
- public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException {
+ public AaiProducerTask registerSimpleAaiPublisherTask() {
CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
ConsumerDmaapModel consumerDmaapModel = spy(ConsumerDmaapModel.class);
doReturn(mock(AaiClientConfiguration.class)).when(cbsConfiguration).getAaiClientConfiguration();
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
deleted file mode 100644
index 9afa7671..00000000
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapConsumerConfiguration;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.util.Optional;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
- */
-class DmaapConsumerTaskImplTest {
-
- private static ConsumerDmaapModel consumerDmaapModel;
- private static DmaapConsumerTaskImpl dmaapConsumerTask;
- private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
- private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private static String message;
- private static String messageContentEmpty;
- private static JsonArray jsonArray;
- private static JsonArray jsonArrayWrongContent;
-
- private static CbsConfiguration cbsConfiguration;
-
- @BeforeAll
- static void setUp() {
- dmaapConsumerConfiguration = createDefaultDmaapConsumerConfiguration();
-
- JsonObject jsonObject = new JsonParser().parse("{\n"
- + " \"attachmentPoint\": \"bla-bla-30-3\",\n"
- + " \"cvlan\": \"678\",\n"
- + " \"svlan\": \"1005\"\n"
- + " }").getAsJsonObject();
-
- consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .ipv4("10.16.123.234")
- .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
- .correlationId("NOKQTFCOC540002E")
- .serialNumber("QTFCOC540002E")
- .equipVendor("nokia")
- .equipModel("3310")
- .equipType("type")
- .nfRole("gNB")
- .swVersion("v4.5.0.1")
- .additionalFields(jsonObject)
- .build();
- cbsConfiguration = mock(CbsConfiguration.class);
-
- message = "[{\"event\": {"
- + "\"commonEventHeader\": { "
- + " \"sourceName\":\"NOKQTFCOC540002E\","
- + " \"nfNamingCode\":\"gNB\" "
- + "},"
- + "\"pnfRegistrationFields\": {"
- + " \"vendorName\": \"nokia\","
- + " \"serialNumber\": \"QTFCOC540002E\","
- + " \"pnfRegistrationFieldsVersion\": \"2.0\","
- + " \"modelNumber\": \"3310\","
- + " \"unitType\": \"type\",\n"
- + " \"unitFamily\": \"BBU\","
- + " \"oamV4IpAddress\": \"10.16.123.234\","
- + " \"softwareVersion\": \"v4.5.0.1\","
- + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\","
- + " \"additionalFields\": {\"attachmentPoint\": \"bla-bla-30-3\",\"cvlan\": \"678\",\"svlan\": \"1005\"}"
- + "}}}]";
-
- messageContentEmpty = "[]";
- JsonParser jsonParser = new JsonParser();
- jsonArray = (JsonArray) jsonParser.parse(message);
- jsonArrayWrongContent = (JsonArray) jsonParser.parse(messageContentEmpty);
-
- }
-
- @Test
- void whenPassedObjectDoesNotFit_DoesNotThrowPrhTaskException() throws Exception {
- //given
- prepareMocksForDmaapConsumer(Optional.of(jsonArrayWrongContent));
-
- //when
- Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
-
- //then
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- assertNull(response.blockFirst());
- }
-
- @Test
- void whenPassedObjectFits_ReturnsCorrectResponse() throws Exception {
- //given
- prepareMocksForDmaapConsumer(Optional.of(jsonArray));
-
- //when
- Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
-
- //then
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- assertEquals(consumerDmaapModel, response.blockFirst());
- }
-
-
-
- private void prepareMocksForDmaapConsumer(Optional<JsonArray> message) throws Exception {
- dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
- .thenReturn(Mono.just(message.get()));
- when(cbsConfiguration.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
- dmaapConsumerTask = new DmaapConsumerTaskImpl(cbsConfiguration, new DmaapConsumerJsonParser(), httpClientFactory);
- }
-} \ No newline at end of file
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
index 594575e5..4c95c717 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
@@ -20,19 +20,16 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
-
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
@@ -47,13 +44,10 @@ public class DmaapConsumerTaskSpy {
*/
@Bean
@Primary
- public DmaapConsumerTask registerSimpleDmaapConsumerTask() throws SSLException {
+ public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
- doReturn(mock(DmaapConsumerConfiguration.class)).when(cbsConfiguration).getDmaapConsumerConfiguration();
+ doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest();
DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(cbsConfiguration));
- DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock(
- DMaaPConsumerReactiveHttpClient.class);
- doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
return dmaapConsumerTask;
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
index 77028a34..7a68bc8c 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
@@ -20,20 +20,18 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.util.function.Supplier;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@@ -47,14 +45,10 @@ public class DmaapProducerTaskSpy {
*/
@Bean
@Primary
- public DmaapPublisherTask registerSimpleDmaapPublisherTask() throws SSLException {
- final CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
- final Supplier<DmaapPublisherConfiguration> configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration();
- doReturn(mock(DmaapPublisherConfiguration.class)).when(cbsConfiguration).getDmaapPublisherConfiguration();
- final DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(configSupplier));
- final DMaaPPublisherReactiveHttpClient extendedDmaapProducerHttpClient = mock(
- DMaaPPublisherReactiveHttpClient.class);
- doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient();
- return dmaapPublisherTask;
+ public DmaapPublisherTask registerSimpleDmaapPublisherTask() {
+ final CbsConfiguration cbsConfiguration = mock(CbsConfiguration.class);
+ final Supplier<MessageRouterPublishRequest> configSupplier = cbsConfiguration::getMessageRouterPublishRequest;
+ doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest();
+ return spy(new DmaapPublisherTaskImpl(configSupplier, new MessageRouterPublisherResolver()));
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
index fb4a50ea..6347ad3d 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
@@ -20,136 +20,105 @@
package org.onap.dcaegen2.services.prh.tasks;
-import io.netty.handler.codec.http.HttpResponseStatus;
+import com.google.gson.JsonPrimitive;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
-import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import javax.net.ssl.SSLException;
-import java.util.Optional;
+import org.onap.dcaegen2.services.prh.integration.junit5.mockito.MockitoExtension;
+import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import reactor.core.publisher.Flux;
+
import java.util.function.Supplier;
-import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.*;
-import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapPublisherConfiguration;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
*/
+@ExtendWith(MockitoExtension.class)
class DmaapPublisherTaskImplTest {
- private static ConsumerDmaapModel consumerDmaapModel;
private static DmaapPublisherTaskImpl dmaapPublisherTask;
- private static DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClient;
- private static CbsConfiguration cbsConfiguration;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
- private Optional<RequestDiagnosticContext> requestDiagnosticContextOptionalMock;
- private DmaapModel dmaapModel;
- private PublisherReactiveHttpClientFactory publisherReactiveHttpClientFactory;
- private Supplier<DmaapPublisherConfiguration> configSupplier;
+
+ @Mock
+ private static MessageRouterPublisherResolver messageRouterPublisherClientResolver;
+ @Mock
+ private static MessageRouterPublisher messageRouterPublisher;
+
+ private Supplier<MessageRouterPublishRequest> configSupplier;
+
+
+ @Captor
+ private ArgumentCaptor<Flux<JsonPrimitive>> fluxCaptor;
@BeforeEach
- public void beforeEach() throws SSLException {
- dmaapPublisherConfiguration = createDefaultDmaapPublisherConfiguration();
- consumerDmaapModel = mock(ConsumerDmaapModel.class);
- cbsConfiguration = mock(CbsConfiguration.class);
- requestDiagnosticContextOptionalMock = Optional.empty();
- dmaapModel = mock(DmaapModel.class);
- dMaaPPublisherReactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
- publisherReactiveHttpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
- when(cbsConfiguration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
- when(publisherReactiveHttpClientFactory.create(dmaapPublisherConfiguration))
- .thenReturn(dMaaPPublisherReactiveHttpClient);
- configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration();
+ void beforeEach() {
+ when(messageRouterPublisherClientResolver.resolveClient()).thenReturn(messageRouterPublisher);
+ MessageRouterPublishRequest mrRequest = createMRRequest();
+ configSupplier = () -> mrRequest;
}
@Test
void execute_whenPassedObjectDoesntFit_ThrowsPrhTaskException() {
//given
- dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier);
+ dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver);
//when
Executable executableFunction = () -> dmaapPublisherTask.execute(null);
//then
assertThrows(PrhTaskException.class, executableFunction, "The specified parameter is incorrect");
}
-
@Test
- void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException {
+ void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws DmaapNotFoundException {
//given
- HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK;
- HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
- dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
-
+ dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver);
//when
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
- .expectNext(httpClientReponse);
-
+ dmaapPublisherTask.execute(createConsumerDmaapModel());
//then
- verify(dMaaPPublisherReactiveHttpClient, times(1))
- .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock);
-
- verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
+ verify(messageRouterPublisher).put(eq(configSupplier.get()), fluxCaptor.capture());
+ assertEquals(new JsonPrimitive("{\"correlationId\":\"NOKQTFCOC540002E\"}"), fluxCaptor.getValue().blockFirst());
}
- @Test
- void execute_whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException, SSLException {
- //given
- HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED;
- HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
- dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
- //when
- StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
- .expectNext(httpClientReponse);
-
- //then
- verify(dMaaPPublisherReactiveHttpClient, times(1))
- .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock);
- verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
+ private ImmutableConsumerDmaapModel createConsumerDmaapModel() {
+ return ImmutableConsumerDmaapModel.builder()
+ .ipv4("10.16.123.234")
+ .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
+ .correlationId("NOKQTFCOC540002E")
+ .serialNumber("QTFCOC540002E")
+ .equipVendor("nokia")
+ .equipModel("3310")
+ .equipType("type")
+ .nfRole("gNB")
+ .swVersion("v4.5.0.1")
+ .additionalFields(null)
+ .build();
}
- @Test()
- void execute_whenConsumerDmaapModelIsNull() {
- //given
- HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED;
- HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
- dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
- assertThrows(DmaapNotFoundException.class, () -> {
- dmaapPublisherTask.execute(null);
- });
- }
+ private MessageRouterPublishRequest createMRRequest() {
+ final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
+ .build();
- @Test
- public void resolveClient() throws SSLException {
- //given
- dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
- //when
- DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClientResolved = dmaapPublisherTask.resolveClient();
- //then
- assertSame(dMaaPPublisherReactiveHttpClientResolved, dMaaPPublisherReactiveHttpClient);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType("application/json")
+ .build();
}
-
- private HttpResponse prepareMocksForTests(HttpResponseStatus httpResponseStatus) {
- HttpResponse httpClientResponse = mock(HttpResponse.class);
- when(httpClientResponse.statusCode()).thenReturn(httpResponseStatus.code());
- when(
- dMaaPPublisherReactiveHttpClient.getDMaaPProducerResponse(dmaapModel, requestDiagnosticContextOptionalMock))
- .thenReturn(Mono.just(httpClientResponse));
- return httpClientResponse;
- }
-
} \ No newline at end of file