aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
authorgrabinsk <maciej.grabinski@nokia.com>2019-05-28 11:23:53 +0200
committergrabinsk <maciej.grabinski@nokia.com>2019-05-29 09:32:47 +0200
commit37444e2753f351cfe22b4651bcf777b833aeba92 (patch)
tree9280875024a947f1b644aa9f1c93bc81799d1201 /prh-app-server/src/main/java/org
parent37dd2cb2d2ad38ad1e6dd83f3ff5b40bc2c0f614 (diff)
SSL key loading for Dmaap client
Change-Id: I65b3d0bcd6735af655c9243f20f3596ce8f03aca Issue-ID: DCAEGEN2-1501 Signed-off-by: grabinsk <maciej.grabinski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java33
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java31
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java10
12 files changed, 93 insertions, 144 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
index 889dae20..0cf07a0f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
@@ -25,6 +25,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
@@ -50,6 +53,8 @@ import java.util.Optional;
public class CbsConfiguration extends PrhAppConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class);
private AaiClientConfiguration aaiClientCBSConfiguration;
+ private MessageRouterPublisher messageRouterPublisher;
+ private MessageRouterSubscriber messageRouterSubscriber;
private MessageRouterPublishRequest messageRouterCBSPublishRequest;
private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest;
private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest;
@@ -82,9 +87,16 @@ public class CbsConfiguration extends PrhAppConfig {
private void parseCBSConfig(JsonObject jsonObject) {
LOGGER.info("Received application configuration: {}", jsonObject);
CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject);
+
+ aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig();
+
+ messageRouterPublisher = DmaapClientFactory.createMessageRouterPublisher(
+ consulConfigurationParser.getMessageRouterPublisherConfig());
messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest();
- aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig();
+
+ messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ consulConfigurationParser.getMessageRouterSubscriberConfig());
messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
}
@@ -96,23 +108,34 @@ public class CbsConfiguration extends PrhAppConfig {
LOGGER.warn("Failed to gather configuration from ConfigBindingService/Consul", throwable);
}
+
+ @Override
+ public MessageRouterPublisher getMessageRouterPublisher() {
+ return Optional.ofNullable(messageRouterPublisher).orElseThrow(() -> new RuntimeException("CBS config missing"));
+ }
+
+ @Override
+ public MessageRouterSubscriber getMessageRouterSubscriber() {
+ return Optional.ofNullable(messageRouterSubscriber).orElseThrow(() -> new RuntimeException("CBS config missing"));
+ }
+
@Override
public MessageRouterPublishRequest getMessageRouterPublishRequest() {
- return Optional.ofNullable(messageRouterCBSPublishRequest).orElse(super.getMessageRouterPublishRequest());
+ return Optional.ofNullable(messageRouterCBSPublishRequest).orElseThrow(() -> new RuntimeException("CBS config missing"));
}
@Override
public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
- return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElse(super.getMessageRouterUpdatePublishRequest());
+ return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElseThrow(() -> new RuntimeException("CBS config missing"));
}
@Override
public AaiClientConfiguration getAaiClientConfiguration() {
- return Optional.ofNullable(aaiClientCBSConfiguration).orElse(super.getAaiClientConfiguration());
+ return Optional.ofNullable(aaiClientCBSConfiguration).orElseThrow(() -> new RuntimeException("CBS config missing"));
}
@Override
public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
- return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElse(super.getMessageRouterSubscribeRequest());
+ return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElseThrow(() -> new RuntimeException("CBS config missing"));
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
index 63947fa9..a57a5393 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
@@ -34,7 +34,16 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.Immutable
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
@@ -79,6 +88,31 @@ class CbsContentParser {
.build();
}
+ MessageRouterPublisherConfig getMessageRouterPublisherConfig() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null)
+ .build();
+ }
+
+ MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null)
+ .build();
+ }
+
+ private SecurityKeys createSecurityKeys(JsonObject config) {
+ return ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_KEY_STORE_PATH).getAsString())))
+ .keyStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())))
+ .trustStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_TRUST_STORE_PATH).getAsString())))
+ .trustStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())))
+ .build();
+ }
+
+ private boolean isDmaapCertAuthEnabled(JsonObject config) {
+ return config.get("security.enableDmaapCertAuth").getAsBoolean();
+ }
+
AaiClientConfiguration getAaiClientConfig() {
return new ImmutableAaiClientConfiguration.Builder()
.pnfUrl(jsonObject.get("aai.aaiClientConfiguration.pnfUrl").getAsString())
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
index f914a345..53ccdc05 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
@@ -21,6 +21,8 @@
package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.springframework.core.io.Resource;
@@ -40,4 +42,7 @@ public interface Config {
MessageRouterPublishRequest getMessageRouterUpdatePublishRequest();
+ MessageRouterPublisher getMessageRouterPublisher();
+
+ MessageRouterSubscriber getMessageRouterSubscriber();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
index f18f1d90..e5eb1e66 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
@@ -22,7 +22,6 @@ package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl;
-import org.onap.dcaegen2.services.prh.tasks.MessageRouterPublisherResolver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -31,13 +30,15 @@ import org.springframework.context.annotation.Configuration;
public class DmaapPublisherTaskConfig {
@Bean(name = "ReadyPublisherTask")
@Autowired
- public DmaapPublisherTask getReadyPublisherTask(final Config config) {
- return new DmaapPublisherTaskImpl(config::getMessageRouterPublishRequest, new MessageRouterPublisherResolver());
+ public DmaapPublisherTask getReadyPublisherTask(Config config) {
+ return new DmaapPublisherTaskImpl(
+ config::getMessageRouterPublishRequest, config::getMessageRouterPublisher);
}
@Bean(name = "UpdatePublisherTask")
@Autowired
- public DmaapPublisherTask getUpdatePublisherTask(final Config config) {
- return new DmaapPublisherTaskImpl(config::getMessageRouterUpdatePublishRequest, new MessageRouterPublisherResolver());
+ public DmaapPublisherTask getUpdatePublisherTask(Config config) {
+ return new DmaapPublisherTaskImpl(
+ config::getMessageRouterUpdatePublishRequest, config::getMessageRouterPublisher);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 01ef2063..5ef00dd0 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -20,9 +20,6 @@
package org.onap.dcaegen2.services.prh.configuration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -45,14 +42,6 @@ import java.nio.charset.Charset;
public abstract class PrhAppConfig implements Config {
private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
- AaiClientConfiguration aaiClientConfiguration;
-
- MessageRouterSubscribeRequest messageRouterSubscribeRequest;
-
- MessageRouterPublishRequest messageRouterPublishRequest;
-
- MessageRouterPublishRequest messageRouterUpdatePublishRequest;
-
@Value("classpath:git_info.json")
private Resource gitInfo;
@@ -65,24 +54,4 @@ public abstract class PrhAppConfig implements Config {
public Resource getGitInfo() {
return gitInfo;
}
-
- @Override
- public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
- return messageRouterSubscribeRequest;
- }
-
- @Override
- public AaiClientConfiguration getAaiClientConfiguration() {
- return aaiClientConfiguration;
- }
-
- @Override
- public MessageRouterPublishRequest getMessageRouterPublishRequest() {
- return messageRouterPublishRequest;
- }
-
- @Override
- public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
- return messageRouterUpdatePublishRequest;
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index b3d84562..c059d34b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -58,6 +59,7 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VE
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
+@Component
public class DmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 5fc41d93..5efeae9a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -23,12 +23,10 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import reactor.core.publisher.Flux;
-import javax.net.ssl.SSLException;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
interface DmaapConsumerTask {
- Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
+ Flux<ConsumerDmaapModel> execute();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index f46e2cc9..af5b2505 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -24,10 +24,8 @@ import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -39,28 +37,21 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- @Autowired
- public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser());
- }
-
- DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ public DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) {
- MessageRouterSubscriber messageRouterSubscriberClient =
- new MessageRouterSubscriberResolver().resolveClient();
- LOGGER.debug("Method called with arg {}", object);
- Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
- .get(config.getMessageRouterSubscribeRequest());
+ public Flux<ConsumerDmaapModel> execute() {
+ MessageRouterSubscriber messageRouterSubscriber = config.getMessageRouterSubscriber();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = config.getMessageRouterSubscribeRequest();
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriber
+ .get(messageRouterSubscribeRequest);
return dmaapConsumerJsonParser.getJsonObject(response);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 1a528180..2890d195 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -39,14 +39,15 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Supplier<MessageRouterPublishRequest> config;
- private final MessageRouterPublisherResolver messageRouterPublisherClientResolver;
+ private final Supplier<MessageRouterPublishRequest> publishRequestSupplier;
+ private final Supplier<MessageRouterPublisher> publisherSupplier;
private final PnfReadyJsonBodyBuilder pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilder();
- public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) {
- this.config = config;
- this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver;
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> publishRequestSupplier,
+ Supplier<MessageRouterPublisher> publisherSupplier) {
+ this.publishRequestSupplier = publishRequestSupplier;
+ this.publisherSupplier = publisherSupplier;
}
@Override
@@ -54,10 +55,11 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
+ MessageRouterPublisher messageRouterPublisher = publisherSupplier.get();
+ MessageRouterPublishRequest messageRouterPublishRequest = publishRequestSupplier.get();
return messageRouterPublisher.put(
- config.get(),
+ messageRouterPublishRequest,
Flux.just(pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel)));
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
deleted file mode 100644
index 2f4e3867..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.tasks;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.springframework.stereotype.Component;
-
-@Component
-public class MessageRouterPublisherResolver {
-
- public MessageRouterPublisher resolveClient() {
- return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
deleted file mode 100644
index 63930ef7..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.tasks;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.springframework.stereotype.Component;
-
-@Component
-public class MessageRouterSubscriberResolver {
-
- public MessageRouterSubscriber resolveClient() {
- return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 8aad3eed..ba18f7b2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -157,18 +157,10 @@ public class ScheduledTasks {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
LOGGER.info(INVOKE, "Init configs");
- return consumeFromDMaaP();
+ return dmaapConsumerTask.execute();
});
}
- private Flux<ConsumerDmaapModel> consumeFromDMaaP() {
- try {
- return dmaapConsumerTask.execute("");
- } catch (SSLException e) {
- return Flux.error(e);
- }
- }
-
private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
return aaiQueryTask
.execute(monoDMaaPModel)