diff options
author | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-09-14 15:49:10 +0200 |
---|---|---|
committer | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-09-17 08:06:07 +0200 |
commit | f394594ec70aaf1eefa4f23b80226c3426dbc17a (patch) | |
tree | 89e280889259b11c2651b06fb55d679cf5faf410 /datafile-app-server/src/main/java | |
parent | 7ceca5db40ff6cbd88c95fe335a6a6e582189066 (diff) |
Deliver first version of Datafile
Change-Id: Iadd1455d7fe45b4c022dd7fde2f8a506d1b7cd57
Issue-ID: DCAEGEN2-640
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
27 files changed, 778 insertions, 765 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java index 63fbccb0..d0443ecf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,29 +13,24 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@SpringBootApplication -@Configuration -@ComponentScan -@EnableAutoConfiguration(exclude = {JacksonAutoConfiguration.class}) +@SpringBootApplication(exclude = {JacksonAutoConfiguration.class}) @EnableScheduling public class MainApp { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 1fd50c94..245e0959 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,35 +13,30 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Optional; -import java.util.function.Predicate; - -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.config.ImmutableAaiClientConfiguration; import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; - /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Component @Configuration public class AppConfig extends DatafileAppConfig { - private static Predicate<String> isEmpty = String::isEmpty; @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}") public String consumerDmaapHostName; @@ -71,8 +64,8 @@ public class AppConfig extends DatafileAppConfig { @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}") public String consumerGroup; - @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}") - public Integer consumerTimeoutMs; + @Value("${dmaap.dmaapConsumerConfiguration.timeoutMS:}") + public Integer consumerTimeoutMS; @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}") public Integer consumerMessageLimit; @@ -98,113 +91,65 @@ public class AppConfig extends DatafileAppConfig { @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}") public String producerDmaapContentType; - @Value("${aai.aaiClientConfiguration.aaiHost:}") - public String aaiHost; - - @Value("${aai.aaiClientConfiguration.aaiHostPortNumber:}") - public Integer aaiPort; - - @Value("${aai.aaiClientConfiguration.aaiProtocol:}") - public String aaiProtocol; - - @Value("${aai.aaiClientConfiguration.aaiUserName:}") - public String aaiUserName; - - @Value("${aai.aaiClientConfiguration.aaiUserPassword:}") - public String aaiUserPassword; - - @Value("${aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors:}") - public Boolean aaiIgnoreSslCertificateErrors; - - @Value("${aai.aaiClientConfiguration.aaiBasePath:}") - public String aaiBasePath; - - @Value("${aai.aaiClientConfiguration.aaiPnfPath:}") - public String aaiPnfPath; - @Override public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return new ImmutableDmaapConsumerConfiguration.Builder() .dmaapUserPassword( - Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapUserPassword).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapUserPassword())) .dmaapUserName( - Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapUserName).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapUserName())) .dmaapHostName( - Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapHostName).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapHostName())) .dmaapPortNumber( Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapPortNumber())) .dmaapProtocol( - Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapProtocol).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapProtocol())) .dmaapContentType( - Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapContentType).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapContentType())) .dmaapTopicName( - Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate()) + Optional.ofNullable(consumerDmaapTopicName).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.dmaapTopicName())) .messageLimit( Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty()) .orElse(dmaapConsumerConfiguration.messageLimit())) - .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.timeoutMs())) - .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate()) + .timeoutMS(Optional.ofNullable(consumerTimeoutMS).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapConsumerConfiguration.timeoutMS())) + .consumerGroup(Optional.ofNullable(consumerGroup).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.consumerGroup())) - .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate()) + .consumerId(Optional.ofNullable(consumerId).filter(p -> !p.isEmpty()) .orElse(dmaapConsumerConfiguration.consumerId())) .build(); } @Override - public AaiClientConfiguration getAaiClientConfiguration() { - return new ImmutableAaiClientConfiguration.Builder() - .aaiHost(Optional.ofNullable(aaiHost).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiHost())) - .aaiPort( - Optional.ofNullable(aaiPort).filter(p -> !p.toString().isEmpty()) - .orElse(aaiClientConfiguration.aaiPort())) - .aaiIgnoreSslCertificateErrors( - Optional.ofNullable(aaiIgnoreSslCertificateErrors).filter(p -> !p.toString().isEmpty()) - .orElse(aaiClientConfiguration.aaiIgnoreSslCertificateErrors())) - .aaiProtocol( - Optional.ofNullable(aaiProtocol).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiProtocol())) - .aaiUserName( - Optional.ofNullable(aaiUserName).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiUserName())) - .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.aaiUserPassword())) - .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.aaiBasePath())) - .aaiPnfPath( - Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath())) - .aaiHeaders(aaiClientConfiguration.aaiHeaders()) - .build(); - } - - @Override public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return new ImmutableDmaapPublisherConfiguration.Builder() .dmaapContentType( - Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapContentType).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapContentType())) .dmaapHostName( - Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapHostName).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapHostName())) .dmaapPortNumber( Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapPortNumber())) .dmaapProtocol( - Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapProtocol).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapProtocol())) .dmaapTopicName( - Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapTopicName).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapTopicName())) .dmaapUserName( - Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapUserName).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapUserName())) .dmaapUserPassword( - Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate()) + Optional.ofNullable(producerDmaapUserPassword).filter(p -> !p.isEmpty()) .orElse(dmaapPublisherConfiguration.dmaapUserPassword())) .build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java index 5c6f1512..5ab4358a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,24 +13,22 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/25/18 + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public interface Config { DmaapConsumerConfiguration getDmaapConsumerConfiguration(); - AaiClientConfiguration getAaiClientConfiguration(); - DmaapPublisherConfiguration getDmaapPublisherConfiguration(); void initFileStreamReader(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java index 169bf8ed..b6525f0f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,30 +13,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; - -import java.nio.charset.StandardCharsets; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.slf4j.Logger; @@ -47,8 +36,16 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Configuration @EnableConfigurationProperties @@ -56,15 +53,11 @@ import org.springframework.context.annotation.Configuration; public abstract class DatafileAppConfig implements Config { private static final String CONFIG = "configs"; - private static final String AAI = "aai"; private static final String DMAAP = "dmaap"; - private static final String AAI_CONFIG = "aaiClientConfiguration"; private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - AaiClientConfiguration aaiClientConfiguration; + private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); DmaapConsumerConfiguration dmaapConsumerConfiguration; @@ -80,11 +73,6 @@ public abstract class DatafileAppConfig implements Config { } @Override - public AaiClientConfiguration getAaiClientConfiguration() { - return aaiClientConfiguration; - } - - @Override public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } @@ -100,10 +88,6 @@ public abstract class DatafileAppConfig implements Config { JsonElement rootElement = getJsonElement(parser, inputStream); if (rootElement.isJsonObject()) { jsonObject = rootElement.getAsJsonObject(); - aaiClientConfiguration = deserializeType(gsonBuilder, - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG), - AaiClientConfiguration.class); - dmaapConsumerConfiguration = deserializeType(gsonBuilder, jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), DmaapConsumerConfiguration.class); @@ -113,14 +97,14 @@ public abstract class DatafileAppConfig implements Config { DmaapPublisherConfiguration.class); } } catch (IOException e) { - logger.warn("Problem with file loading, file: {}", filepath, e); + logger.error("Problem with file loading, file: {}", filepath, e); } catch (JsonSyntaxException e) { - logger.warn("Problem with Json deserialization", e); + logger.error("Problem with Json deserialization", e); } } JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + return parser.parse(new InputStreamReader(inputStream)); } private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 823fe732..1d0a192f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,29 +1,25 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; -import io.swagger.annotations.ApiOperation; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; @@ -33,6 +29,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + +import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** @@ -43,7 +41,7 @@ import reactor.core.publisher.Mono; public class SchedulerConfig extends DatafileAppConfig { private static final int SCHEDULING_DELAY = 2000; - private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>(); + private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<ScheduledFuture>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; @@ -63,9 +61,8 @@ public class SchedulerConfig extends DatafileAppConfig { public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); - return Mono.defer(() -> - Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)) - ); + return Mono.defer(() -> Mono + .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } /** @@ -77,8 +74,8 @@ public class SchedulerConfig extends DatafileAppConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler - .scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, SCHEDULING_DELAY)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, + SCHEDULING_DELAY)); return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index c45b136a..967be5f6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; @@ -25,6 +23,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; + import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; @@ -37,46 +36,42 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @EnableSwagger2 @Configuration @Profile("prod") -public class SwaggerConfig extends WebMvcConfigurationSupport { +public class SwaggerConfig extends WebMvcConfigurationSupport{ - private static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile"; - private static final String API_TITLE = "Datafile app server"; - private static final String DESCRIPTION = "This page lists all the rest apis for Datafile app server."; - private static final String VERSION = "1.0"; - private static final String RESOURCES_PATH = "classpath:/META-INF/resources/"; - private static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/"; - private static final String SWAGGER_UI = "swagger-ui.html"; - private static final String WEBJARS = "/webjars/**"; + public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile"; + public static final String API_TITLE = "DATAFILE app server"; + public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server."; + public static final String VERSION = "1.0"; + public static final String RESOURCES_PATH = "classpath:/META-INF/resources/"; + public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/"; + public static final String SWAGGER_UI = "swagger-ui.html"; + public static final String WEBJARS = "/webjars/**"; - /** - * Swagger configuration function for hosting it next to spring http website. - * @return Docket - */ - @Bean - public Docket api() { - return new Docket(DocumentationType.SWAGGER_2) - .apiInfo(apiInfo()) - .select() - .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) - .paths(PathSelectors.any()) - .build(); - } + @Bean + public Docket api() { + return new Docket(DocumentationType.SWAGGER_2) + .apiInfo(apiInfo()) + .select() + .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) + .paths(PathSelectors.any()) + .build(); + } - private ApiInfo apiInfo() { - return new ApiInfoBuilder() - .title(API_TITLE) - .description(DESCRIPTION) - .version(VERSION) - .build(); - } + private ApiInfo apiInfo() { + return new ApiInfoBuilder() + .title(API_TITLE) + .description(DESCRIPTION) + .version(VERSION) + .build(); + } - @Override - protected void addResourceHandlers(ResourceHandlerRegistry registry) { - registry.addResourceHandler(SWAGGER_UI) - .addResourceLocations(RESOURCES_PATH); + @Override + protected void addResourceHandlers(ResourceHandlerRegistry registry) { + registry.addResourceHandler(SWAGGER_UI) + .addResourceLocations(RESOURCES_PATH); - registry.addResourceHandler(WEBJARS) - .addResourceLocations(WEBJARS_PATH); - } -}
\ No newline at end of file + registry.addResourceHandler(WEBJARS) + .addResourceLocations(WEBJARS_PATH); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java deleted file mode 100644 index b6231418..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import org.apache.catalina.connector.Connector; -import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; -import org.springframework.boot.web.servlet.server.ServletWebServerFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18 - */ -@Configuration -public class TomcatHttpConfig { - - /** - * Class for setting up hosting Datafile on http/https. - * - * @return ServletWebServerFactory - */ - @Bean - public ServletWebServerFactory servletContainer() { - TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); - tomcat.addAdditionalTomcatConnectors(getHttpConnector()); - return tomcat; - } - - private Connector getHttpConnector() { - Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); - connector.setScheme("http"); - connector.setPort(8100); - connector.setSecure(false); - return connector; - } - -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java index 070fe591..98dfdedc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,15 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.controllers; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; @@ -31,35 +25,36 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @RestController -@Api(value = "HeartbeatController", description = "Check liveness of Datafile service") +@Api(value = "HeartbeatController", description = "Check liveness of DATAFILE service") public class HeartbeatController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); - /** - * Endpoint for checking that Datafile is alive. - * - * @return HTTP Status Code - */ @RequestMapping(value = "heartbeat", method = RequestMethod.GET) - @ApiOperation(value = "Returns liveness of Datafile service") + @ApiOperation(value = "Returns liveness of DATAFILE service") @ApiResponses(value = { - @ApiResponse(code = 200, message = "Datafile sevice is living"), - @ApiResponse(code = 401, message = "You are not authorized to view the resource"), - @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") - } + @ApiResponse(code = 200, message = "DATAFILE service is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") + } ) public Mono<ResponseEntity<String>> heartbeat() { logger.trace("Receiving heartbeat request"); return Mono.defer(() -> - Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) + Mono.just(new ResponseEntity<>("I'm living", HttpStatus.OK)) ); } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index f3cf354f..5765b31c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,14 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.controllers; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; - import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,16 +27,21 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ + @RestController @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduleController.class); private final SchedulerConfig schedulerConfig; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java index 41f77332..2d62871c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,13 +13,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.exceptions; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class DatafileTaskException extends Exception { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java index d9f6f873..cf387296 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.exceptions; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java index ebff8ae3..e27a2036 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.exceptions; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java new file mode 100644 index 00000000..0f03b1a4 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Ericsson. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FilenameUtils; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +@Component +public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito + // 2.x for testing so it is left for later improvement. + private static final String FTPES = "ftpes"; + private static final String FTPS = "ftps"; + private static final String SFTP = "sftp"; + + private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + @Autowired + protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) { + this.ftpsClient = ftpsCleint; + this.sftpClient = sftpClient; + } + + public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { + List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>(); + for (FileData fileData : listOfFileData) { + String localFile = collectFile(fileData); + + if (localFile != null) { + ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); + consumerModels.add(consumerDmaapModel); + } + } + return Mono.just(consumerModels); + } + + private String collectFile(FileData fileData) { + String location = fileData.location(); + URI uri = URI.create(location); + String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(uri.getHost()) + .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "") + .port(uri.getPort()).build(); + String remoteFile = uri.getPath(); + String localFile = "target/" + FilenameUtils.getName(remoteFile); + String scheme = uri.getScheme(); + + boolean fileDownloaded = false; + if (FTPES.equals(scheme) || FTPS.equals(scheme)) { + fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile); + } else if (SFTP.equals(scheme)) { + fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile); + } else { + + logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and " + + SFTP + ". " + fileData); + localFile = null; + } + if (!fileDownloaded) { + localFile = null; + } + return localFile; + } + + private String[] getUserNameAndPasswordIfGiven(String userInfoString) { + String[] userInfo = null; + if (userInfoString != null && !userInfoString.isEmpty()) { + userInfo = userInfoString.split(":"); + } + return userInfo; + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { + String compression = fileData.compression(); + String fileFormatType = fileData.fileFormatType(); + String fileFormatVersion = fileData.fileFormatVersion(); + + return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression) + .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java new file mode 100644 index 00000000..d4eca4d7 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import org.immutables.value.Value; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +@Value.Immutable +public interface FileServerData { + public String serverAddress(); + public String userId(); + public String password(); + public int port(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java new file mode 100644 index 00000000..a3f75826 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -0,0 +1,117 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.commons.net.ftp.FTPReply; +import org.apache.commons.net.ftp.FTPSClient; +import org.apache.commons.net.util.TrustManagerUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Gets file from xNF with FTPS protocol. + * + * TODO: Refactor for better test. + * + * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> + * + */ +@Component +public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockito 2.x to be able to + // mock then, so this will be done as an improvement after first version + // committed. + private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); + + public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) { + boolean result = true; + try { + FTPSClient ftps = new FTPSClient("TLS"); + + result = setUpConnection(fileServerData, ftps); + + if (result) { + getFile(remoteFile, localFile, ftps); + + closeDownConnection(ftps); + } + } catch (IOException ex) { + logger.error("Unable to collect file from xNF. " + fileServerData, ex); + result = false; + } + return result; + } + + private boolean setUpConnection(FileServerData fileServerData, FTPSClient ftps) { + boolean success = true; + ftps.setTrustManager(TrustManagerUtils.getAcceptAllTrustManager()); + + try { + ftps.connect(fileServerData.serverAddress(), fileServerData.port()); + + if (!ftps.login(fileServerData.userId(), fileServerData.password())) { + ftps.logout(); + logger.error("Unable to log in to xNF. " + fileServerData); + success = false; + } + + if (success) { + int reply = ftps.getReplyCode(); + if (!FTPReply.isPositiveCompletion(reply)) { + ftps.disconnect(); + logger.error("Unable to connect in to xNF. " + fileServerData); + success = false; + } + ftps.enterLocalPassiveMode(); + } + } catch (Exception ex) { + logger.error("Unable to connect to xNF." + fileServerData, ex); + success = false; + } + + return success; + } + + private void getFile(String remoteFile, String localFile, FTPSClient ftps) + throws IOException, FileNotFoundException { + OutputStream output; + File outfile = new File(localFile); + outfile.createNewFile(); + + output = new FileOutputStream(outfile); + + ftps.retrieveFile(remoteFile, output); + + output.close(); + logger.debug("File " + outfile.getName() + " Download Successfull from xNF"); + } + + private void closeDownConnection(FTPSClient ftps) { + try { + ftps.logout(); + ftps.disconnect(); + } catch (Exception e) { + // Do nothing, file has been collected. + } + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java new file mode 100644 index 00000000..e7c7c09b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpException; + +import org.apache.commons.io.FilenameUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Gets file from xNF with SFTP protocol. + * + * TODO: Refactor for better test. + * + * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> + * + */ +@Component +public class SftpClient { // TODO: Should be final but needs PowerMock to be able to mock then, so + // this will be done as an improvement after first version committed. + private static final Logger logger = LoggerFactory.getLogger(SftpClient.class); + + public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) { + boolean result = true; + Session session = setUpSession(fileServerData); + + if (session != null) { + ChannelSftp sftpChannel = getChannel(session, fileServerData); + if (sftpChannel != null) { + try { + sftpChannel.get(remoteFile, localFile); + logger.debug("File " + FilenameUtils.getName(localFile) + " Download Successfull from xNF"); + } catch (SftpException e) { + logger.error("Unable to get file from xNF. " + fileServerData, e); + result = false; + } + + sftpChannel.exit(); + } else { + result = false; + } + session.disconnect(); + } else { + result = false; + } + return result; + } + + private Session setUpSession(FileServerData fileServerData) { + JSch jsch = new JSch(); // TODO: Might be changed to use Spring as an improvement after + // first version committed. + + Session session = null; + try { + session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port()); + session.setConfig("StrictHostKeyChecking", "no"); + session.setPassword(fileServerData.password()); + session.connect(); + } catch (JSchException e) { + logger.error("Unable to set up SFTP connection to xNF. " + fileServerData, e); + } + return session; + } + + private ChannelSftp getChannel(Session session, FileServerData fileServerData) { + ChannelSftp sftpChannel = null; + try { + Channel channel; + channel = session.openChannel("sftp"); + channel.connect(); + sftpChannel = (ChannelSftp) channel; + } catch (JSchException e) { + logger.error("Unable to get sftp channel to xNF. " + fileServerData, e); + } + return sftpChannel; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index aeaf0da1..98f3a72a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -1,130 +1,171 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.service; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; + +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.springframework.util.StringUtils; + import reactor.core.publisher.Mono; /** + * Parses the fileReady event and creates an array of FileData containing the information. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class DmaapConsumerJsonParser { private static final String EVENT = "event"; - private static final String OTHER_FIELDS = "otherFields"; - private static final String PNF_OAM_IPV_4_ADDRESS = "pnfOamIpv4Address"; - private static final String PNF_OAM_IPV_6_ADDRESS = "pnfOamIpv6Address"; - private static final String PNF_VENDOR_NAME = "pnfVendorName"; - private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber"; + private static final String NOTIFICATION_FIELDS = "notificationFields"; + private static final String CHANGE_IDENTIFIER = "changeIdentifier"; + private static final String CHANGE_TYPE = "changeType"; + private static final String NOTIFICATION_FIELDS_VERSION = "notificationFieldsVersion"; + + private static final String LOCATION = "location"; + private static final String COMPRESSION = "compression"; + private static final String FILE_FORMAT_TYPE = "fileFormatType"; + private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; /** - * Extract info from string and create @see {@link org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel}. + * Extract info from string and create @see + * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}. * * @param monoMessage - results from DMaaP - * @return reactive DMaaPModel + * @return reactive Mono with an array of FileData */ - public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { - return monoMessage - .flatMap(this::getJsonParserMessage) - .flatMap(this::createJsonConsumerModel); + public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) { + return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() - ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getConsumerDmaapModelFromJsonArray(jsonElement); + private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) { + return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + : getFileDataFromJsonArray(jsonElement); } - private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { - return create( - Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new))); + private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) { + return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); } - private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) { + return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) + ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) : transform(monoJsonP)); } - private Mono<ConsumerDmaapModel> transform(JsonObject monoJsonP) { - monoJsonP = monoJsonP.getAsJsonObject(EVENT).getAsJsonObject(OTHER_FIELDS); - String pnfVendorName = getValueFromJson(monoJsonP, PNF_VENDOR_NAME); - String pnfSerialNumber = getValueFromJson(monoJsonP, PNF_SERIAL_NUMBER); - String pnfOamIpv4Address = getValueFromJson(monoJsonP, PNF_OAM_IPV_4_ADDRESS); - String pnfOamIpv6Address = getValueFromJson(monoJsonP, PNF_OAM_IPV_6_ADDRESS); - return - (!vendorAndSerialNotEmpty(pnfSerialNumber, pnfVendorName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, - pnfOamIpv6Address)) - ? Mono.error(new DmaapNotFoundException("Incorrect json, consumerDmaapModel can not be created: " - + printMessage(pnfVendorName, pnfSerialNumber, pnfOamIpv4Address, pnfOamIpv6Address))) : - Mono.just(ImmutableConsumerDmaapModel.builder() - .pnfName(pnfVendorName.substring(0, Math.min(pnfVendorName.length(), 3)).toUpperCase() - .concat(pnfSerialNumber)).ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address).build()); + private Mono<List<FileData>> transform(JsonObject jsonObject) { + if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { + JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); + String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); + String changeType = getValueFromJson(notificationFields, CHANGE_TYPE); + String notificationFieldsVersion = getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION); + JsonArray arrayOfAdditionalFields = notificationFields.getAsJsonArray("arrayOfAdditionalFields"); + + if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) + && arrayOfAdditionalFields != null) { + Mono<List<FileData>> res = + getFileDataFromJson(changeIdentifier, changeType, arrayOfAdditionalFields); + return res; + } + + if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { + return Mono.error( + new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); + } else if (arrayOfAdditionalFields != null) { + return Mono.error(new DmaapNotFoundException( + "FileReady event arrayOfAdditionalFields is missing. " + jsonObject)); + } + return Mono.error( + new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + } + return Mono.error( + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + + } + + private Mono<List<FileData>> getFileDataFromJson(String changeIdentifier, String changeType, + JsonArray arrayOfAdditionalFields) { + List<FileData> res = new ArrayList<>(); + for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { + if (arrayOfAdditionalFields.get(i) != null) { + JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); + String fileFormatType = getValueFromJson(fileInfo, FILE_FORMAT_TYPE); + String fileFormatVersion = getValueFromJson(fileInfo, FILE_FORMAT_VERSION); + String location = getValueFromJson(fileInfo, LOCATION); + String compression = getValueFromJson(fileInfo, COMPRESSION); + if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) + && isLocationAndCompressionNotEmpty(location, compression)) { + res.add(ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build()); + } else { + return Mono.error(new DmaapNotFoundException( + "FileReady event does not contain correct file format information. " + fileInfo)); + } + } + } + return Mono.just(res); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : ""; } - private boolean vendorAndSerialNotEmpty(String pnfSerialNumber, String pnfVendorName) { - return (!StringUtils.isEmpty(pnfSerialNumber) && !StringUtils.isEmpty(pnfVendorName)); + private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, + String notificationFieldsVersion) { + return ((changeIdentifier != null && !changeIdentifier.isEmpty()) + && (changeType != null && !changeType.isEmpty()) + && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); + } + + private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { + return ((fileFormatVersion != null && !fileFormatVersion.isEmpty()) + && (fileFormatType != null && !fileFormatType.isEmpty())); } - private boolean ipPropertiesNotEmpty(String ipv4, String ipv6) { - return (!StringUtils.isEmpty(ipv4)) || !(StringUtils.isEmpty(ipv6)); + private boolean isLocationAndCompressionNotEmpty(String location, String compression) { + return (location != null && !location.isEmpty()) && (compression != null && !compression.isEmpty()); } private boolean containsHeader(JsonObject jsonObject) { - return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS); + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS); } - private String printMessage(String pnfVendorName, String pnfSerialNumber, String pnfOamIpv4Address, - String pnfOamIpv6Address) { - return String.format("%n{" - + "\"pnfVendorName\" : \"%s\"," - + "\"pnfSerialNumber\": \"%s\"," - + "\"pnfOamIpv4Address\": \"%s\"," - + "\"pnfOamIpv6Address\": \"%s\"" - + "%n}", pnfVendorName, pnfSerialNumber, pnfOamIpv4Address, pnfOamIpv6Address); + private boolean containsHeader(JsonObject jsonObject, String topHeader, String header) { + return jsonObject.has(topHeader) && jsonObject.getAsJsonObject(topHeader).has(header); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/AaiNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java index a83b5bd6..948976b6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/AaiNotFoundException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,17 +13,25 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.exceptions; +package org.onap.dcaegen2.collectors.datafile.service; + +import org.immutables.value.Value; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * Contains data, from the fileReady event, about the file to collect from the xNF. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * */ -public class AaiNotFoundException extends DatafileTaskException { - - public AaiNotFoundException(String message) { - super(message); - } +@Value.Immutable +public interface FileData { + public String changeIdentifier(); + public String changeType(); + public String location(); + public String compression(); + public String fileFormatType(); + public String fileFormatVersion(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java deleted file mode 100644 index 8083a255..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.util.Optional; - -import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.AaiConsumerClient; - -public abstract class AaiConsumerTask { - - abstract Optional<String> consume(ConsumerDmaapModel message) throws AaiNotFoundException; - - abstract AaiConsumerClient resolveClient(); - - protected abstract String execute(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException; -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java deleted file mode 100644 index d487b6b2..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import java.io.IOException; -import java.util.Optional; - -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.AaiConsumerClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class AaiConsumerTaskImpl extends AaiConsumerTask { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final Config datafileAppConfig; - private AaiConsumerClient aaiConsumerClient; - - @Autowired - public AaiConsumerTaskImpl(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; - } - - @Override - Optional<String> consume(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException { - logger.trace("Method called with arg {}", consumerDmaapModel); - try { - return aaiConsumerClient.getHttpResponse(consumerDmaapModel); - } catch (IOException e) { - logger.warn("Get request not successful", e); - throw new AaiNotFoundException("Get request not successful"); - } - } - - @Override - public String execute(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException { - consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) - .orElseThrow(() -> new AaiNotFoundException("Invoked null object to AAI task")); - logger.trace("Method called with arg {}", consumerDmaapModel); - aaiConsumerClient = resolveClient(); - return consume(consumerDmaapModel).orElseThrow(() -> new AaiNotFoundException("Null response code")); - } - - protected AaiClientConfiguration resolveConfiguration() { - return datafileAppConfig.getAaiClientConfiguration(); - } - - @Override - AaiConsumerClient resolveClient() { - return Optional.ofNullable(aaiConsumerClient).orElseGet(() -> new AaiConsumerClient(resolveConfiguration())); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java deleted file mode 100644 index ca2d03da..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.AaiReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.producer.AaiProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ -public abstract class AaiProducerTask { - - abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; - - abstract AaiProducerReactiveHttpClient resolveClient(); - - protected abstract AaiClientConfiguration resolveConfiguration(); - - protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) - throws DatafileTaskException; - - WebClient buildWebClient() { - return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java deleted file mode 100644 index 9d888aa7..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.tasks; - -import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; -import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.utils.HttpUtils; -import org.onap.dcaegen2.collectors.datafile.service.producer.AaiProducerReactiveHttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ -@Component -public class AaiProducerTaskImpl extends - AaiProducerTask { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final Config datafileAppConfig; - private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; - - @Autowired - public AaiProducerTaskImpl(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; - } - - @Override - Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - logger.info("Sending PNF model to AAI {}", consumerDmaapModel); - return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) - .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response)) { - return consumerDmaapModel; - } - return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); - }); - } - - @Override - AaiProducerReactiveHttpClient resolveClient() { - return aaiProducerReactiveHttpClient == null ? new AaiProducerReactiveHttpClient(resolveConfiguration()) - .createAaiWebClient(buildWebClient()) : aaiProducerReactiveHttpClient; - } - - @Override - protected AaiClientConfiguration resolveConfiguration() { - return datafileAppConfig.getAaiClientConfiguration(); - } - - @Override - protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException { - if (consumerDmaapModel == null) { - throw new DmaapNotFoundException("Invoked null object to DMaaP task"); - } - aaiProducerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", consumerDmaapModel); - return publish(consumerDmaapModel); - - } -}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index a5764704..30bf536e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,35 +13,41 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.util.List; + import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ abstract class DmaapConsumerTask { - abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws DatafileTaskException; + abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException; - abstract DMaaPConsumerReactiveHttpClient resolveClient(); + abstract DmaapConsumerReactiveHttpClient resolveClient(); abstract void initConfigs(); protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(String object) throws DatafileTaskException; + protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException; WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index 8d45a7fd..fdd1bb49 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -1,70 +1,86 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.util.List; + import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final Config datafileAppConfig; + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); + + private Config datafileAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + FileCollector fileCollector; @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { + public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) { this.datafileAppConfig = datafileAppConfig; this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + this.fileCollector = fileCollector; } - DmaapConsumerTaskImpl(AppConfig datafileAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { + protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { this.datafileAppConfig = datafileAppConfig; + this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; + this.fileCollector = fileCollector; } @Override - Mono<ConsumerDmaapModel> consume(Mono<String> message) { - logger.info("Consumed model from DMaaP: {}", message); + Mono<List<FileData>> consume(Mono<String> message) { + logger.trace("Method called with arg {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } + private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { + Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData); + return filesFromSender; + } + @Override - public Mono<ConsumerDmaapModel> execute(String object) { + protected Mono<List<ConsumerDmaapModel>> execute(String object) { dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); - return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + Mono<List<FileData>> consumerResult = + consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); + return consumerResult.flatMap(this::getFilesFromSender); } @Override @@ -78,9 +94,9 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } @Override - DMaaPConsumerReactiveHttpClient resolveClient() { + protected DmaapConsumerReactiveHttpClient resolveClient() { return dmaaPConsumerReactiveHttpClient == null - ? new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) - : dmaaPConsumerReactiveHttpClient; + ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) + : dmaaPConsumerReactiveHttpClient; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java index 467eee0b..716b52c1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -1,47 +1,48 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.util.List; + import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException; + abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException; - abstract DMaaPProducerReactiveHttpClient resolveClient(); + abstract DmaapProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException; + protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) + throws DatafileTaskException; WebClient buildWebClient() { - return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 4d435a4f..8c4d7072 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -1,46 +1,47 @@ /* - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.util.List; + import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Component public class DmaapPublisherTaskImpl extends DmaapPublisherTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); private final Config datafileAppConfig; - private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; + private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @Autowired public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { @@ -48,20 +49,21 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { - logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), - consumerDmaapModel); - return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) + throws DatafileTaskException { + logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); + return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); } @Override - public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { - if (consumerDmaapModel == null) { + public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) + throws DatafileTaskException { + if (consumerDmaapModels == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } dmaapProducerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", consumerDmaapModel); - return publish(consumerDmaapModel); + logger.trace("Method called with arg {}", consumerDmaapModels); + return publish(consumerDmaapModels); } @Override @@ -70,9 +72,9 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - DMaaPProducerReactiveHttpClient resolveClient() { + DmaapProducerReactiveHttpClient resolveClient() { return dmaapProducerReactiveHttpClient == null - ? new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) - : dmaapProducerReactiveHttpClient; + ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) + : dmaapProducerReactiveHttpClient; } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 2600b563..14085bb8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,9 +1,7 @@ /* - * ============LICENSE_START======================================================= - * Datafile Collector Service - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,59 +13,57 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * ============LICENSE_END========================================================= + * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; +import java.util.List; import java.util.concurrent.Callable; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; - private final AaiProducerTask aaiProducerTask; /** - * Constructor for tasks registration in DatafileWorkflow. + * Constructor for task registration in Datafile Workflow. * * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - third task - * @param aaiPublisherTask - second task + * @param dmaapPublisherTask - second task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask, - AaiProducerTask aaiPublisherTask) { + public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) { this.dmaapConsumerTask = dmaapConsumerTask; this.dmaapProducerTask = dmaapPublisherTask; - this.aaiProducerTask = aaiPublisherTask; } /** - * Main function for scheduling datafileWorkflow. + * Main function for scheduling Datafile Workflow. */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) + Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage()) + .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) .flatMap(this::publishToDmaapConfiguration) .subscribeOn(Schedulers.elastic()); @@ -84,28 +80,20 @@ public class ScheduledTasks { private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.warn("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { + private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() { return () -> { dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); }; } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { - try { - return aaiProducerTask.execute(monoDMaaPModel); - } catch (DatafileTaskException e) { - return Mono.error(e); - } - } - - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) { try { - return dmaapProducerTask.execute(monoAaiModel); + return dmaapProducerTask.execute(monoModel); } catch (DatafileTaskException e) { return Mono.error(e); } |