diff options
author | pwielebs <piotr.wielebski@nokia.com> | 2019-05-16 17:44:45 +0200 |
---|---|---|
committer | pwielebs <piotr.wielebski@nokia.com> | 2019-05-22 14:01:54 +0200 |
commit | 2cf649dda43c7fc7650b5d0047ccc57108918724 (patch) | |
tree | 03d07378786376e077f7d95a6a98c4f66ab85719 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks | |
parent | a4f457e46a336a30ceea69a742e8b8aa8f2e720f (diff) |
Align PRH to El Alto SDK
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c
Issue-ID: DCAEGEN2-1501
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
8 files changed, 120 insertions, 101 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java index 02691446..0b26890d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper; import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper; import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; -import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; - - import java.util.Arrays; import java.util.List; import java.util.function.Function; @@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask { @Autowired BbsActionsTaskImpl(Config config) { - this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext())); + this(config, RxHttpClientFactory.createInsecure()); } BbsActionsTaskImpl(Config config, RxHttpClient httpClient) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 3a630a40..5fc41d93 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -20,20 +20,15 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; - import reactor.core.publisher.Flux; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ interface DmaapConsumerTask { - Flux<ConsumerDmaapModel> execute(String object) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index d3086cbe..f46e2cc9 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,15 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; -import com.google.gson.JsonElement; -import java.util.Optional; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private final DmaapConsumerJsonParser dmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + @Autowired public DmaapConsumerTaskImpl(Config config) { - this(config, new DmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(config, new DmaapConsumerJsonParser()); } - DmaapConsumerTaskImpl(Config prhAppConfig, - DmaapConsumerJsonParser dmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.config = prhAppConfig; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; } @Override - public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); + public Flux<ConsumerDmaapModel> execute(String object) { + MessageRouterSubscriber messageRouterSubscriberClient = + new MessageRouterSubscriberResolver().resolveClient(); LOGGER.debug("Method called with arg {}", object); - Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse( - Optional.empty()); + Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient + .get(config.getMessageRouterSubscribeRequest()); return dmaapConsumerJsonParser.getJsonObject(response); } - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.getDmaapConsumerConfiguration()); - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 7fc596c1..f63f4d76 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -20,10 +20,10 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -31,16 +31,8 @@ import reactor.core.publisher.Mono; */ public interface DmaapPublisherTask { - /** - * - * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); - * */ - @Deprecated - Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; + Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 55a8bb58..3a724884 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonPrimitive; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.net.ssl.SSLException; -import java.util.Optional; import java.util.function.Supplier; /** @@ -46,70 +44,52 @@ import java.util.function.Supplier; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Supplier<DmaapPublisherConfiguration> config; + + private final Supplier<MessageRouterPublishRequest> config; + private final MessageRouterPublisherResolver messageRouterPublisherClientResolver; private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); - private final PublisherReactiveHttpClientFactory httpClientFactory; - public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) { - this(config, new PublisherReactiveHttpClientFactory( - new DmaaPRestTemplateFactory(), - new PnfReadyJsonBodyBuilderImpl())); - } - DmaapPublisherTaskImpl( - Supplier<DmaapPublisherConfiguration> config, - PublisherReactiveHttpClientFactory httpClientFactory) { + public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) { this.config = config; - this.httpClientFactory = httpClientFactory; + this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver; } @Override - public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException { + public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); + MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); - } - - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.get()); - + String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); + return messageRouterPublisher.put( + config.get(), + Flux.just(json).map(JsonPrimitive::new)); } /** * * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Override public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); - DefaultHttpClient httpClient = new DefaultHttpClient(); - HttpPost postRequest = new HttpPost(getUrl()); - try { - StringEntity input = new StringEntity(json); - input.setContentType(config.get().dmaapContentType()); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - return Mono.just(response); - } catch (Exception e) { - LOGGER.warn("Publishing to DMaaP MR failed: {}", e); - return Mono.error(e); + try (DefaultHttpClient httpClient = new DefaultHttpClient()) { + HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl()); + try { + StringEntity input = new StringEntity(json); + input.setContentType(config.get().contentType()); + postRequest.setEntity(input); + HttpResponse response = httpClient.execute(postRequest); + return Mono.just(response); + } catch (Exception e) { + LOGGER.warn("Publishing to DMaaP MR failed: {}", e); + return Mono.error(e); + } } } - private String getUrl() { - return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol()) - .host(config.get().dmaapHostName()) - .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build() - .toString(); - } - private String createRequestPath() { - return "/" + config.get().dmaapTopicName(); - } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java new file mode 100644 index 00000000..2f4e3867 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterPublisherResolver { + + public MessageRouterPublisher resolveClient() { + return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java new file mode 100644 index 00000000..63930ef7 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.springframework.stereotype.Component; + +@Component +public class MessageRouterSubscriberResolver { + + public MessageRouterSubscriber resolveClient() { + return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 72ec0cac..4b3436e5 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -50,6 +51,7 @@ public class ScheduledTasks { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapReadyProducerTask; private final DmaapPublisherTask dmaapUpdateProducerTask; @@ -208,7 +210,7 @@ public class ScheduledTasks { * Marked as deprecated due to problems with DMaaP MR, to be fixed in future */ @Deprecated - private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) { try { if (state.ActivationStatus) { @@ -217,8 +219,8 @@ public class ScheduledTasks { } return dmaapReadyProducerTask.execute(state.DmaapModel); - } catch (PrhTaskException | SSLException e) { - return Mono.error(e); + } catch (PrhTaskException e) { + return Flux.error(e); } } |