diff options
Diffstat (limited to 'datafile-app-server/src/main')
37 files changed, 1447 insertions, 185 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java index d0443ecf..cc5d12b9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -27,10 +27,12 @@ import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; /** + * The main app of DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -@SpringBootApplication(exclude = {JacksonAutoConfiguration.class}) +@SpringBootApplication(exclude = { JacksonAutoConfiguration.class }) @EnableScheduling public class MainApp { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index a30d2826..a38eab8f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -22,17 +22,14 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; - import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ServiceLoader; - import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; - import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 6c0c156d..6b7860c4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START======================================================= * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * ================================================================================ @@ -26,11 +26,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl /** + * Parses the cloud configuration. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class CloudConfigParser { + private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath"; + private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath"; + private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; + private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; + private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; + private final JsonObject jsonObject; CloudConfigParser(JsonObject jsonObject) { @@ -46,11 +54,11 @@ public class CloudConfigParser { .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString()) - .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString()) - .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString()) - .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString()) - .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean()) + .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) + .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) + .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) + .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) + .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } @@ -67,20 +75,20 @@ public class CloudConfigParser { .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get("dmaap.security.trustStorePath").getAsString()) - .trustStorePasswordPath(jsonObject.get("dmaap.security.trustStorePasswordPath").getAsString()) - .keyStorePath(jsonObject.get("dmaap.security.keyStorePath").getAsString()) - .keyStorePasswordPath(jsonObject.get("dmaap.security.keyStorePasswordPath").getAsString()) - .enableDmaapCertAuth(jsonObject.get("dmaap.security.enableDmaapCertAuth").getAsBoolean()) + .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString()) + .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString()) + .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString()) + .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString()) + .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } - public FtpesConfig getFtpesConfig() { - return new ImmutableFtpesConfig.Builder() + FtpesConfig getFtpesConfig() { + return new ImmutableFtpesConfig.Builder() // .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString()) .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString()) - .trustedCA(jsonObject.get("dmaap.ftpesConfig.trustedCA").getAsString()) - .trustedCAPassword(jsonObject.get("dmaap.ftpesConfig.trustedCAPassword").getAsString()) + .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString()) + .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) // .build(); } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 0150d86c..597f525f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -17,10 +17,8 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonObject; - import java.util.Map; import java.util.Properties; - import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; @@ -34,11 +32,12 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.annotation.EnableScheduling; - import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** + * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @@ -63,6 +62,9 @@ public class CloudConfiguration extends AppConfig { this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; } + /** + * Reads the cloud configuration. + */ public void runTask() { Map<String,String> context = MappedDiagnosticContext.initializeTraceContext(); EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index 969a7e05..71003f80 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START======================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================================= @@ -19,17 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.util.Map; import java.util.Optional; import java.util.Properties; - import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; - import reactor.core.publisher.Mono; /** + * Handling the Consul connection. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 */ class EnvironmentProcessor { @@ -63,7 +63,8 @@ class EnvironmentProcessor { } private static Integer getConsultPort(Properties systemEnvironments) { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) // + .map(Integer::valueOf) // .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java index 5ca8ecd5..3f029359 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/FtpesConfig.java @@ -17,10 +17,10 @@ * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ + package org.onap.dcaegen2.collectors.datafile.configuration; import java.io.Serializable; - import org.immutables.gson.Gson; import org.immutables.value.Value; import org.springframework.stereotype.Component; @@ -41,8 +41,8 @@ public abstract class FtpesConfig implements Serializable { public abstract String keyPassword(); @Value.Parameter - public abstract String trustedCA(); + public abstract String trustedCa(); @Value.Parameter - public abstract String trustedCAPassword(); -}
\ No newline at end of file + public abstract String trustedCaPassword(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 58c77a11..b78e4ae5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,15 +16,15 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import io.swagger.annotations.ApiOperation; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; - import javax.annotation.PostConstruct; - import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.slf4j.Logger; @@ -36,8 +36,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; - -import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** @@ -55,7 +53,7 @@ public class SchedulerConfig { private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); - private Map<String, String> contextMap; + private Map<String, String> contextMap = new HashMap<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; @@ -110,11 +108,13 @@ public class SchedulerConfig { scheduledFutureList .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); - return true; } else { return false; } + } + static void setScheduledFutureList(List<ScheduledFuture<?>> scheduledFutureList) { + SchedulerConfig.scheduledFutureList = scheduledFutureList; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java index 967be5f6..7fb1ba72 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; - import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; @@ -36,42 +35,45 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2; @EnableSwagger2 @Configuration @Profile("prod") -public class SwaggerConfig extends WebMvcConfigurationSupport{ - - public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile"; - public static final String API_TITLE = "DATAFILE app server"; - public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server."; - public static final String VERSION = "1.0"; - public static final String RESOURCES_PATH = "classpath:/META-INF/resources/"; - public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/"; - public static final String SWAGGER_UI = "swagger-ui.html"; - public static final String WEBJARS = "/webjars/**"; +public class SwaggerConfig extends WebMvcConfigurationSupport { - @Bean - public Docket api() { - return new Docket(DocumentationType.SWAGGER_2) - .apiInfo(apiInfo()) - .select() - .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) - .paths(PathSelectors.any()) - .build(); - } + public static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile"; + public static final String API_TITLE = "DATAFILE app server"; + public static final String DESCRIPTION = "This page lists all the rest apis for DATAFILE app server."; + public static final String VERSION = "1.0"; + public static final String RESOURCES_PATH = "classpath:/META-INF/resources/"; + public static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/"; + public static final String SWAGGER_UI = "swagger-ui.html"; + public static final String WEBJARS = "/webjars/**"; - private ApiInfo apiInfo() { - return new ApiInfoBuilder() - .title(API_TITLE) - .description(DESCRIPTION) - .version(VERSION) - .build(); - } + /** + * Gets the API info. + * + * @return the API info. + */ + @Bean + public Docket api() { + return new Docket(DocumentationType.SWAGGER_2) // + .apiInfo(apiInfo()) // + .select().apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) // + .paths(PathSelectors.any()) // + .build(); + } + private ApiInfo apiInfo() { + return new ApiInfoBuilder() // + .title(API_TITLE) // + .description(DESCRIPTION) // + .version(VERSION) // + .build(); + } - @Override - protected void addResourceHandlers(ResourceHandlerRegistry registry) { - registry.addResourceHandler(SWAGGER_UI) - .addResourceLocations(RESOURCES_PATH); + @Override + protected void addResourceHandlers(ResourceHandlerRegistry registry) { + registry.addResourceHandler(SWAGGER_UI) // + .addResourceLocations(RESOURCES_PATH); - registry.addResourceHandler(WEBJARS) - .addResourceLocations(WEBJARS_PATH); - } + registry.addResourceHandler(WEBJARS) // + .addResourceLocations(WEBJARS_PATH); + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java index 84f8c0f0..cbd67297 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. * =============================================================================================== @@ -23,11 +23,17 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** + * Configuration of Tomcat. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18 */ @Configuration public class TomcatHttpConfig { - + /** + * Creates a Tomcat server factory. + * + * @return a Tomcat server factory. + */ @Bean public ServletWebServerFactory servletContainer() { TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java index 073c8462..b0e339ef 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -16,6 +16,10 @@ package org.onap.dcaegen2.collectors.datafile.controllers; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,14 +29,11 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; import reactor.core.publisher.Mono; /** + * Controller to check the heartbeat of DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @@ -42,6 +43,11 @@ public class HeartbeatController { private static final Logger logger = LoggerFactory.getLogger(HeartbeatController.class); + /** + * Checks the heartbeat of DFC. + * + * @return the heartbeat status of DFC. + */ @GetMapping("/heartbeat") @ApiOperation(value = "Returns liveness of DATAFILE service") @ApiResponses(value = { // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java index 42949f95..4716fa87 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -18,6 +18,8 @@ package org.onap.dcaegen2.collectors.datafile.controllers; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.slf4j.Logger; @@ -29,12 +31,11 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** + * The HTTP api to start and stop DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @@ -52,6 +53,12 @@ public class ScheduleController { this.schedulerConfig = schedulerConfig; } + /** + * Start the DFC. + * + * @param headers the request headers. + * @return the response. + */ @GetMapping("/start") @ApiOperation(value = "Start scheduling worker request") public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) { @@ -67,6 +74,11 @@ public class ScheduleController { .map(this::createStartTaskResponse); } + /** + * Stop the DFC. + * + * @return the response. + */ @GetMapping("/stopDatafile") @ApiOperation(value = "Receiving stop scheduling worker request") public Mono<ResponseEntity<String>> stopTask(@RequestHeader HttpHeaders headers) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java new file mode 100644 index 00000000..42308000 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -0,0 +1,32 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +public class DatafileTaskException extends Exception { + + private static final long serialVersionUID = 1L; + + public DatafileTaskException(String message) { + super(message); + } + + public DatafileTaskException(String message, Exception originalException) { + super(message, originalException); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java index ebfe1902..d49a051f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START======================================================= * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. * ================================================================================ @@ -19,6 +19,8 @@ package org.onap.dcaegen2.collectors.datafile.exceptions; /** + * Exception thrown when there is a problem with the Consul environment. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 */ public class EnvironmentLoaderException extends Exception { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java new file mode 100644 index 00000000..c35a5a1d --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java @@ -0,0 +1,31 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import java.nio.file.Path; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; + +/** + * A closeable file client. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public interface FileCollectClient extends AutoCloseable { + public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException; + + public void open() throws DatafileTaskException; +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java new file mode 100644 index 00000000..4c49dd8a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import java.util.Optional; +import org.immutables.value.Value; + +/** + * Data about the file server to collect a file from. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +@Value.Immutable +public interface FileServerData { + public String serverAddress(); + + public String userId(); + + public String password(); + + public Optional<Integer> port(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java new file mode 100644 index 00000000..b8488f34 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -0,0 +1,198 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.Optional; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPReply; +import org.apache.commons.net.ftp.FTPSClient; +import org.apache.commons.net.util.KeyManagerUtils; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.FileSystemResource; + +/** + * Gets file from PNF with FTPS protocol. + * + * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> + */ +public class FtpsClient implements FileCollectClient { + private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); + + private static final int FTPS_DEFAULT_PORT = 21; + + FTPSClient realFtpsClient = new FTPSClient(); + private final FileServerData fileServerData; + private static TrustManager theTrustManager = null; + + private final String keyCertPath; + private final String keyCertPassword; + private final Path trustedCaPath; + private final String trustedCaPassword; + + /** + * Constructor. + * + * @param fileServerData info needed to connect to the PNF. + * @param keyCertPath path to DFC's key cert. + * @param keyCertPassword password for DFC's key cert. + * @param trustedCaPath path to the PNF's trusted keystore. + * @param trustedCaPassword password for the PNF's trusted keystore. + */ + public FtpsClient(FileServerData fileServerData, String keyCertPath, String keyCertPassword, Path trustedCaPath, + String trustedCaPassword) { + this.fileServerData = fileServerData; + this.keyCertPath = keyCertPath; + this.keyCertPassword = keyCertPassword; + this.trustedCaPath = trustedCaPath; + this.trustedCaPassword = trustedCaPassword; + } + + @Override + public void open() throws DatafileTaskException { + try { + realFtpsClient.setNeedClientAuth(true); + realFtpsClient.setKeyManager(createKeyManager(keyCertPath, keyCertPassword)); + realFtpsClient.setTrustManager(getTrustManager(trustedCaPath, trustedCaPassword)); + setUpConnection(); + } catch (DatafileTaskException e) { + throw e; + } catch (Exception e) { + throw new DatafileTaskException("Could not open connection: " + e, e); + } + } + + @Override + public void close() { + logger.trace("starting to closeDownConnection"); + if (realFtpsClient.isConnected()) { + try { + boolean logOut = realFtpsClient.logout(); + logger.trace("logOut: {}", logOut); + } catch (Exception e) { + logger.trace("Unable to logout connection.", e); + } + try { + realFtpsClient.disconnect(); + logger.trace("disconnected!"); + } catch (Exception e) { + logger.trace("Unable to disconnect connection.", e); + } + } + } + + @Override + public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException { + logger.trace("collectFile called"); + + try (OutputStream output = createOutputStream(localFileName)) { + logger.trace("begin to retrieve from xNF."); + if (!realFtpsClient.retrieveFile(remoteFileName, output)) { + throw new DatafileTaskException("Could not retrieve file " + remoteFileName); + } + } catch (IOException e) { + throw new DatafileTaskException("Could not fetch file: " + e, e); + } + logger.trace("collectFile fetched: {}", localFileName); + } + + private int getPort(Optional<Integer> port) { + return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; + } + + private void setUpConnection() throws DatafileTaskException, IOException { + + realFtpsClient.connect(fileServerData.serverAddress(), getPort(fileServerData.port())); + logger.trace("after ftp connect"); + + if (!realFtpsClient.login(fileServerData.userId(), fileServerData.password())) { + throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress()); + } + + if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) { + realFtpsClient.enterLocalPassiveMode(); + realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE); + // Set protection buffer size + realFtpsClient.execPBSZ(0); + // Set data channel protection to private + realFtpsClient.execPROT("P"); + realFtpsClient.setBufferSize(1024 * 1024); + } else { + throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress() + + " xNF reply code: " + realFtpsClient.getReplyCode()); + } + + logger.trace("setUpConnection successfully!"); + } + + private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword) + throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException { + logger.trace("Creating trust manager from file: {}", trustedCaPath); + try (InputStream fis = createInputStream(trustedCaPath)) { + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(fis, trustedCaPassword.toCharArray()); + TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509"); + factory.init(keyStore); + return factory.getTrustManagers()[0]; + } + } + + protected InputStream createInputStream(Path localFileName) throws IOException { + FileSystemResource realResource = new FileSystemResource(localFileName); + return realResource.getInputStream(); + } + + protected OutputStream createOutputStream(Path localFileName) throws IOException { + File localFile = localFileName.toFile(); + if (localFile.createNewFile()) { + logger.warn("Local file {} already created", localFileName); + } + OutputStream output = new FileOutputStream(localFile); + logger.debug("File {} opened xNF", localFileName); + return output; + } + + protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPassword) + throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException { + synchronized (FtpsClient.class) { + if (theTrustManager == null) { + theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword); + } + return theTrustManager; + } + } + + protected KeyManager createKeyManager(String keyCertPath, String keyCertPassword) + throws IOException, GeneralSecurityException { + return KeyManagerUtils.createClientKeyManager(new File(keyCertPath), keyCertPassword); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java new file mode 100644 index 00000000..b98885b3 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; + +/** + * Enum specifying the schemes that DFC support for downloading files. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public enum Scheme { + FTPS, SFTP; + + /** + * Get a <code>Scheme</code> from a string. + * + * @param schemeString the string to convert to <code>Scheme</code>. + * @return The corresponding <code>Scheme</code> + * @throws Exception if the value of the string doesn't match any defined scheme. + */ + public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException { + Scheme result; + if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) { + result = Scheme.FTPS; + } else if ("SFTP".equalsIgnoreCase(schemeString)) { + result = Scheme.SFTP; + } else { + throw new DatafileTaskException( + "DFC does not support protocol " + schemeString + ". Supported protocols are FTPES , FTPS, and SFTP"); + } + return result; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java new file mode 100644 index 00000000..40068598 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.ftp; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import java.nio.file.Path; +import java.util.Optional; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Gets file from xNF with SFTP protocol. + * + * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a> + * + */ +public class SftpClient implements FileCollectClient { + private static final Logger logger = LoggerFactory.getLogger(SftpClient.class); + + private static final int FTPS_DEFAULT_PORT = 22; + + private final FileServerData fileServerData; + private Session session = null; + private ChannelSftp sftpChannel = null; + + public SftpClient(FileServerData fileServerData) { + this.fileServerData = fileServerData; + } + + @Override + public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException { + logger.trace("collectFile {}", localFile); + + try { + sftpChannel.get(remoteFile, localFile.toString()); + logger.debug("File {} Download Successfull from xNF", localFile.getFileName()); + } catch (Exception e) { + throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e); + } + + logger.trace("collectFile OK"); + } + + @Override + public void close() { + logger.trace("closing sftp session"); + if (sftpChannel != null) { + sftpChannel.exit(); + sftpChannel = null; + } + if (session != null) { + session.disconnect(); + session = null; + } + } + + @Override + public void open() throws DatafileTaskException { + try { + if (session == null) { + session = setUpSession(fileServerData); + sftpChannel = getChannel(session); + } + } catch (JSchException e) { + throw new DatafileTaskException("Could not open Sftp client" + e, e); + } + } + + private int getPort(Optional<Integer> port) { + return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT; + } + + private Session setUpSession(FileServerData fileServerData) throws JSchException { + JSch jsch = new JSch(); + + Session newSession = + jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port())); + newSession.setConfig("StrictHostKeyChecking", "no"); + newSession.setPassword(fileServerData.password()); + newSession.connect(); + return newSession; + } + + private ChannelSftp getChannel(Session session) throws JSchException { + Channel channel = session.openChannel("sftp"); + channel.connect(); + return (ChannelSftp) channel; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java new file mode 100644 index 00000000..f6f93d49 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.http; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import org.apache.http.client.RedirectStrategy; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; + +public class HttpAsyncClientBuilderWrapper { + HttpAsyncClientBuilder builder = HttpAsyncClients.custom(); + + public HttpAsyncClientBuilderWrapper setRedirectStrategy(RedirectStrategy redirectStrategy) { + builder.setRedirectStrategy(redirectStrategy); + return this; + } + + public HttpAsyncClientBuilderWrapper setSslContext(SSLContext sslcontext) { + builder.setSSLContext(sslcontext); + return this; + } + + public HttpAsyncClientBuilderWrapper setSslHostnameVerifier(HostnameVerifier hostnameVerifier) { + builder.setSSLHostnameVerifier(hostnameVerifier); + return this; + } + + public HttpAsyncClientBuilderWrapper setDefaultRequestConfig(RequestConfig config) { + builder.setDefaultRequestConfig(config); + return this; + } + + public CloseableHttpAsyncClient build() { + return builder.build(); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java new file mode 100644 index 00000000..27df49f2 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import java.lang.reflect.Type; +import java.nio.file.Path; + +/** + * Helper class to serialize object. + */ +public class CommonFunctions { + + private static Gson gson = + new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create(); + + private CommonFunctions() { + } + + /** + * Serializes a <code>filePublishInformation</code>. + * + * @param filePublishInformation info to serialize. + * + * @return a string with the serialized model. + */ + public static String createJsonBody(FilePublishInformation filePublishInformation) { + return gson.toJson(filePublishInformation); + } + + /** + * Json serializer that handles Path serializations, since <code>Path</code> does not implement the + * <code>Serializable</code> interface. + */ + public static class PathConverter implements JsonSerializer<Path> { + @Override + public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) { + return new JsonPrimitive(path.toString()); + } + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 037bd0d3..96237e41 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -1,17 +1,21 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.model; @@ -37,16 +41,22 @@ public abstract class FileData { public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; /** + * Get the file name with no path. + * * @return the file name with no path */ public abstract String name(); /** + * Get the URL to use to fetch the file from the PNF. + * * @return the URL to use to fetch the file from the PNF */ public abstract String location(); /** + * Get the file transfer protocol to use for fetching the file. + * * @return the file transfer protocol to use for fetching the file */ public abstract Scheme scheme(); @@ -60,35 +70,58 @@ public abstract class FileData { public abstract MessageMetaData messageMetaData(); /** + * Get the name of the PNF, must be unique in the network. + * * @return the name of the PNF, must be unique in the network */ public String sourceName() { return messageMetaData().sourceName(); } + /** + * Get the path to file to get from the PNF. + * + * @return the path to the file on the PNF. + */ public String remoteFilePath() { return URI.create(location()).getPath(); } + /** + * Get the path to the locally stored file. + * + * @return the path to the locally stored file. + */ public Path getLocalFilePath() { - return Paths.get(DATAFILE_TMPDIR, name()); + return Paths.get(DATAFILE_TMPDIR, name()); } + /** + * Get the data about the file server where the file should be collected from. + * + * @return the data about the file server where the file should be collected from. + */ public FileServerData fileServerData() { URI uri = URI.create(location()); Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - // @formatter:off - ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() - .serverAddress(uri.getHost()) - .userId(userInfo.isPresent() ? userInfo.get()[0] : "") + ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder() // + .serverAddress(uri.getHost()) // + .userId(userInfo.isPresent() ? userInfo.get()[0] : "") // .password(userInfo.isPresent() ? userInfo.get()[1] : ""); if (uri.getPort() > 0) { builder.port(uri.getPort()); } return builder.build(); - // @formatter:on } + /** + * Extracts user name and password from the user info, if it they are given in the URI. + * + * @param userInfoString the user info string from the URI. + * + * @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty + * <code>Optional</code> if not given. + */ private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) { if (userInfoString != null) { String[] userAndPassword = userInfoString.split(":"); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java new file mode 100644 index 00000000..45302423 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import com.google.gson.annotations.SerializedName; +import java.nio.file.Path; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel; + +/** + * Information needed to publish a file to DataRouter. + * + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ + +@Value.Immutable +@Gson.TypeAdapters +public interface FilePublishInformation extends DmaapModel { + + @SerializedName("productName") + String getProductName(); + + @SerializedName("vendorName") + String getVendorName(); + + @SerializedName("lastEpochMicrosec") + String getLastEpochMicrosec(); + + @SerializedName("sourceName") + String getSourceName(); + + @SerializedName("startEpochMicrosec") + String getStartEpochMicrosec(); + + @SerializedName("timeZoneOffset") + String getTimeZoneOffset(); + + @SerializedName("name") + String getName(); + + @SerializedName("location") + String getLocation(); + + @SerializedName("internalLocation") + Path getInternalLocation(); + + @SerializedName("compression") + String getCompression(); + + @SerializedName("fileFormatType") + String getFileFormatType(); + + @SerializedName("fileFormatVersion") + String getFileFormatVersion(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index 9373a4f2..6cc6da6e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -21,15 +21,17 @@ package org.onap.dcaegen2.collectors.datafile.model; import java.util.List; - import org.immutables.gson.Gson; import org.immutables.value.Value; /** + * Contains all the info about a fileReady message. + * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Value.Immutable @Gson.TypeAdapters +@FunctionalInterface public interface FileReadyMessage { public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java new file mode 100644 index 00000000..92d67383 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +/** + * Meta data about a fileReady message. + * + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +@Value.Immutable +@Gson.TypeAdapters +public interface MessageMetaData { + public String productName(); + + public String vendorName(); + + public String lastEpochMicrosec(); + + public String sourceName(); + + public String startEpochMicrosec(); + + public String timeZoneOffset(); + + public String changeIdentifier(); + + public String changeType(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java new file mode 100644 index 00000000..2643eea5 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.model.logging; + +import java.util.Map; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.HttpRequestBase; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.springframework.http.HttpHeaders; + +/** + * Support functions for MDC. + */ +public final class MappedDiagnosticContext { + + public static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); + public static final Marker EXIT = MarkerFactory.getMarker("EXIT"); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + + private static final Logger logger = LoggerFactory.getLogger(MappedDiagnosticContext.class); + + private MappedDiagnosticContext() {} + + /** + * Inserts the relevant trace information in the HTTP header. + * + * @param httpRequest a request + */ + public static void appendTraceInfo(HttpRequestBase httpRequest) { + String requestId = MDC.get(MdcVariables.REQUEST_ID); + httpRequest.addHeader(MdcVariables.X_ONAP_REQUEST_ID, requestId); + httpRequest.addHeader("X-RequestID", requestId); // deprecated + httpRequest.addHeader("X-TransactionID", requestId); // deprecated + + String invocationId = UUID.randomUUID().toString(); + httpRequest.addHeader(MdcVariables.X_INVOCATION_ID, invocationId); + logger.info(INVOKE, "Invoking request with invocation ID {}", invocationId); + } + + /** + * Initialize MDC from relevant information in a received HTTP header. + * + * @param headers a received HTPP header + */ + public static void initializeTraceContext(HttpHeaders headers) { + String requestId = headers.getFirst(MdcVariables.X_ONAP_REQUEST_ID); + if (StringUtils.isBlank(requestId)) { + requestId = UUID.randomUUID().toString(); + } + String invocationId = headers.getFirst(MdcVariables.X_INVOCATION_ID); + if (StringUtils.isBlank(invocationId)) { + invocationId = UUID.randomUUID().toString(); + } + MDC.put(MdcVariables.REQUEST_ID, requestId); + MDC.put(MdcVariables.INVOCATION_ID, invocationId); + } + + /** + * Initialize the MDC when a new context is started. + * @return a copy of the new trace context + */ + public static Map<String, String> initializeTraceContext() { + MDC.clear(); + MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString()); + return MDC.getCopyOfContextMap(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java new file mode 100644 index 00000000..5f5ccddf --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME; +import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; + +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.Builder; + +import reactor.core.publisher.Mono; + +/** + * Web client for the DMaaP MessageRouter. + * + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 + */ +public class DmaapWebClient { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String contentType; + private String dmaapUserName; + private String dmaapUserPassword; + + /** + * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. + * + * @param dmaapCustomConfig - configuration object + * @return DmaapReactiveWebClient + */ + public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { + this.contentType = dmaapCustomConfig.dmaapContentType(); + return this; + } + + /** + * Construct Reactive WebClient with appropriate settings. + * + * @return WebClient + */ + public WebClient build() { + Builder webClientBuilder = WebClient.builder() // + .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) // + .filter(logRequest()) // + .filter(logResponse()); + if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null + && !dmaapUserPassword.isEmpty()) { + webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword)); + } + return webClientBuilder.build(); + } + + private ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); + logger.trace("Response Status {}", clientResponse.statusCode()); + MDC.remove(RESPONSE_CODE); + return Mono.just(clientResponse); + }); + } + + private ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); + logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value))); + logger.trace("HTTP request headers: {}", clientRequest.headers()); + MDC.remove(SERVICE_NAME); + return Mono.just(clientRequest); + }); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java new file mode 100644 index 00000000..5371d485 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java @@ -0,0 +1,31 @@ +/*- + * ============LICENSE_START====================================================================== + * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import org.apache.http.HttpStatus; + +public final class HttpUtils implements HttpStatus { + + private HttpUtils() { + } + + public static boolean isSuccessfulResponseCode(Integer statusCode) { + return statusCode >= 200 && statusCode < 300; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index a3595ecf..5bcf18c6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -22,13 +22,11 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; - import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.StreamSupport; - import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -39,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -244,7 +241,7 @@ public class JsonMessageParser { .location(location) // .scheme(scheme) // .compression(getValueFromJson(data, COMPRESSION, missingValues)) // - .messageMetaData(messageMetaData) + .messageMetaData(messageMetaData) // .build(); if (missingValues.isEmpty()) { return Optional.of(fileData); @@ -254,9 +251,8 @@ public class JsonMessageParser { } /** - * Gets data from the event name. - * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: - * Noti_RnNode-Ericsson_FileReady + * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, + * example: Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. * @param eventName The event name to get the data from. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index e2dca182..257c356c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -24,8 +24,8 @@ import java.util.Iterator; import java.util.Map; /** - * A cache of all files that already has been published. Key is the local file path and the value is - * a time stamp, when the key was last used. + * A cache of all files that already has been published. Key is the local file path and the value is a time stamp, when + * the key was last used. */ public class PublishedFileCache { private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); @@ -71,6 +71,4 @@ public class PublishedFileCache { final int timeToKeepInfoInSeconds = 60 * 60 * 24; return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; } - - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java new file mode 100644 index 00000000..c61b7a4d --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -0,0 +1,187 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.Future; +import javax.net.ssl.SSLContext; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.ssl.SSLContextBuilder; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; +import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.springframework.web.util.DefaultUriBuilderFactory; +import org.springframework.web.util.UriBuilder; + +/** + * Client used to send requests to DataRouter. + * + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class DmaapProducerHttpClient { + + private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(2); + private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN"); + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final DmaapPublisherConfiguration configuration; + + /** + * Constructor DmaapProducerReactiveHttpClient. + * + * @param dmaapPublisherConfiguration - DMaaP producer configuration object + */ + public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + this.configuration = dmaapPublisherConfiguration; + } + + /** + * Executes the given request and handles redirects. + * + * @param request the request to execute. + * @param contextMap context for logging. + * + * @return the response from the request. + * + * @throws DatafileTaskException if anything goes wrong. + */ + public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap) + throws DatafileTaskException { + MDC.setContextMap(contextMap); + try (CloseableHttpAsyncClient webClient = createWebClient(true, DEFAULT_REQUEST_TIMEOUT)) { + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); + HttpResponse response = future.get(); + logger.trace(INVOKE_RETURN, "Response from DR {}", response); + return response; + } catch (Exception e) { + throw new DatafileTaskException("Unable to create web client.", e); + } + } + + /** + * Executes the given request using the given timeout time. + * + * @param request the request to execute. + * @param requestTimeout the timeout time for the request. + * @param contextMap context for logging. + * + * @return the response from the request. + * + * @throws DatafileTaskException if anything goes wrong. + */ + public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, Duration requestTimeout, + Map<String, String> contextMap) throws DatafileTaskException { + MDC.setContextMap(contextMap); + try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) { + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); + HttpResponse response = future.get(); + logger.trace(INVOKE_RETURN, "Response from DR {}", response); + return response; + } catch (Exception e) { + throw new DatafileTaskException("Unable to create web client.", e); + } + } + + /** + * Adds the user credentials needed to talk to DataRouter to the provided request. + * + * @param request the request to add credentials to. + */ + public void addUserCredentialsToHead(HttpUriRequest request) { + String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword(); + byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); + byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); + String base64Creds = new String(base64CredsBytes); + logger.trace("base64Creds...: {}", base64Creds); + request.addHeader("Authorization", "Basic " + base64Creds); + } + + /** + * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be + * added to the URI by the user. + * + * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. + */ + public UriBuilder getBaseUri() { + return new DefaultUriBuilderFactory().builder() // + .scheme(configuration.dmaapProtocol()) // + .host(configuration.dmaapHostName()) // + .port(configuration.dmaapPortNumber()); + } + + private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout) + throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + SSLContext sslContext = + new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + + HttpAsyncClientBuilderWrapper clientBuilder = getHttpClientBuilder(); + clientBuilder.setSslContext(sslContext) // + .setSslHostnameVerifier(new NoopHostnameVerifier()); + + if (expectRedirect) { + clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE); + } + + if (requestTimeout.toMillis() > 0) { + int millis = (int) requestTimeout.toMillis(); + RequestConfig requestConfig = RequestConfig.custom() // + .setSocketTimeout(millis) // + .setConnectTimeout(millis) // + .setConnectionRequestTimeout(millis) // + .build(); + + clientBuilder.setDefaultRequestConfig(requestConfig); + } else { + logger.error("WEB client without timeout created {}", requestTimeout); + } + + return clientBuilder.build(); + } + + HttpAsyncClientBuilderWrapper getHttpClientBuilder() { + return new HttpAsyncClientBuilderWrapper(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 49e2f01e..e50ef580 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -20,42 +20,47 @@ package org.onap.dcaegen2.collectors.datafile.tasks; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** + * Component used to get messages from the MessageRouter. + * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -public class DMaaPMessageConsumerTask { - private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); +public class DMaaPMessageConsumer { + private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); private final JsonMessageParser jsonMessageParser; private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { + public DMaaPMessageConsumer(AppConfig datafileAppConfig) { this.jsonMessageParser = new JsonMessageParser(); this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } - protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, JsonMessageParser messageParser) { this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.jsonMessageParser = messageParser; } - public Flux<FileReadyMessage> execute() { - logger.trace("execute called"); + /** + * Gets the response from the MessageRouter and turns it into a stream of fileReady messages. + * + * @return a stream of fileReady messages. + */ + public Flux<FileReadyMessage> getMessageRouterResponse() { + logger.trace("getMessageRouterResponse called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @@ -66,7 +71,7 @@ public class DMaaPMessageConsumerTask { private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); - WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build(); + WebClient client = new DmaapWebClient().fromConfiguration(config).build(); return new DMaaPConsumerReactiveHttpClient(config, client); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 3546a08f..ad03170d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -34,7 +34,7 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; @@ -69,7 +69,6 @@ public class DataRouterPublisher { this.datafileAppConfig = datafileAppConfig; } - /** * Publish one file. * @@ -77,27 +76,28 @@ public class DataRouterPublisher { * @param numRetries the maximal number of retries if the publishing fails * @param firstBackoff the time to delay the first retry * @param contextMap tracing context variables - * @return the (same) ConsumerDmaapModel + * @return the (same) filePublishInformation */ - public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, - Map<String, String> contextMap) { + public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries, + Duration firstBackoff, Map<String, String> contextMap) { MDC.setContextMap(contextMap); - logger.trace("Publish called with arg {}", model); + logger.trace("publishFile called with arg {}", model); dmaapProducerReactiveHttpClient = resolveClient(); - return Mono.just(model) - .cache() + return Mono.just(model) // + .cache() // .flatMap(m -> publishFile(m, contextMap)) // .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) // .retryBackoff(numRetries, firstBackoff); } - private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) { - logger.trace("Entering publishFile with {}", consumerDmaapModel); + private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation, + Map<String, String> contextMap) { + logger.trace("Entering publishFile with {}", filePublishInformation); try { HttpPut put = new HttpPut(); - prepareHead(consumerDmaapModel, put); - prepareBody(consumerDmaapModel, put); + prepareHead(filePublishInformation, put); + prepareBody(filePublishInformation, put); dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); HttpResponse response = @@ -105,12 +105,12 @@ public class DataRouterPublisher { logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { - logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e); return Mono.error(e); } } - private void prepareHead(ConsumerDmaapModel model, HttpPut put) { + private void prepareHead(FilePublishInformation model, HttpPut put) { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); @@ -120,7 +120,7 @@ public class DataRouterPublisher { MappedDiagnosticContext.appendTraceInfo(put); } - private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { + private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException { Path fileLocation = model.getInternalLocation(); try (InputStream fileInputStream = createInputStream(fileLocation)) { put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); @@ -134,7 +134,7 @@ public class DataRouterPublisher { .pathSegment(fileName).build(); } - private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, + private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model, Map<String, String> contextMap) { MDC.setContextMap(contextMap); if (HttpUtils.isSuccessfulResponseCode(response.value())) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 3e444af0..cb93df1e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -20,24 +20,24 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; - import reactor.core.publisher.Mono; /** + * Collects a file from a PNF. + * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class FileCollector { @@ -45,22 +45,37 @@ public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); private final AppConfig datafileAppConfig; + /** + * Constructor. + * + * @param datafileAppConfig application configuration + */ public FileCollector(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; } - public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, + /** + * Collects a file from the PNF and stores it in the local file system. + * + * @param fileData data about the file to collect. + * @param numRetries the number of retries if the publishing fails + * @param firstBackoff the time to delay the first retry + * @param contextMap context for logging. + * + * @return the data needed to publish the file. + */ + public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { MDC.setContextMap(contextMap); - logger.trace("Entering execute with {}", fileData); + logger.trace("Entering collectFile with {}", fileData); return Mono.just(fileData) // .cache() // .flatMap(fd -> collectFile(fileData, contextMap)) // - .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); + .retryBackoff(numRetries, firstBackoff); } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.trace("starting to collectFile {}", fileData.name()); @@ -71,9 +86,10 @@ public class FileCollector { currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getConsumerDmaapModel(fileData, localFile)); + return Mono.just(getFilePublishInformation(fileData, localFile)); } catch (Exception throwable) { - logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), + throwable.toString()); return Mono.error(throwable); } } @@ -89,10 +105,10 @@ public class FileCollector { } } - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) { + private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) { String location = fileData.location(); MessageMetaData metaData = fileData.messageMetaData(); - return ImmutableConsumerDmaapModel.builder() // + return ImmutableFilePublishInformation.builder() // .productName(metaData.productName()) // .vendorName(metaData.vendorName()) // .lastEpochMicrosec(metaData.lastEpochMicrosec()) // @@ -109,12 +125,12 @@ public class FileCollector { } protected SftpClient createSftpClient(FileData fileData) { - return new SftpClient(fileData.fileServerData()); + return new SftpClient(fileData.fileServerData()); } protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), - Paths.get(config.trustedCA()), config.trustedCAPassword()); + Paths.get(config.trustedCa()), config.trustedCaPassword()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 89fa259c..e18da248 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -24,7 +24,6 @@ import java.io.InputStream; import java.net.URI; import java.time.Duration; import java.util.Map; - import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -69,7 +68,7 @@ public class PublishedChecker { * * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. */ - public boolean execute(String fileName, Map<String, String> contextMap) { + public boolean isFilePublished(String fileName, Map<String, String> contextMap) { MDC.setContextMap(contextMap); DmaapProducerHttpClient producerClient = resolveClient(); @@ -80,8 +79,8 @@ public class PublishedChecker { producerClient.addUserCredentialsToHead(getRequest); try { - HttpResponse response = - producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap); + HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, + WEB_CLIENT_TIMEOUT, contextMap); logger.trace("{}", response); int status = response.getStatusLine().getStatusCode(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 8b496ba2..037f495f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -23,8 +23,8 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; @@ -39,8 +39,8 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the - * message router, fetch new files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new + * files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -79,14 +79,14 @@ public class ScheduledTasks { applicationConfiguration.loadConfigurationFromFile(); createMainTask(context) // .subscribe(model -> onSuccess(model, context), // - thr -> onError(thr, context), // - () -> onComplete(context)); + thr -> onError(thr, context), // + () -> onComplete(context)); } catch (Exception e) { logger.error("Unexpected exception: ", e); } } - Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { + Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // @@ -115,8 +115,8 @@ public class ScheduledTasks { return currentNumberOfTasks.get(); } - protected DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); + protected DMaaPMessageConsumer createConsumerTask() { + return new DMaaPMessageConsumer(this.applicationConfiguration); } protected FileCollector createFileCollector() { @@ -132,7 +132,7 @@ public class ScheduledTasks { logger.trace("Datafile tasks have been completed"); } - private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.info("Datafile file published {}", model.getInternalLocation()); } @@ -146,19 +146,19 @@ public class ScheduledTasks { boolean result = false; Path localFilePath = fileData.getLocalFilePath(); if (alreadyPublishedFiles.put(localFilePath) == null) { - result = !createPublishedChecker().execute(fileData.name(), contextMap); + result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap); } return result; } - private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); return createFileCollector() - .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { + private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { MDC.setContextMap(contextMap); Path localFilePath = fileData.getLocalFilePath(); logger.error("File fetching failed, fileData {}", fileData); @@ -168,15 +168,17 @@ public class ScheduledTasks { return Mono.empty(); } - private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { + private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model, + Map<String, String> contextMap) { MDC.setContextMap(contextMap); return createDataRouterPublisher() - .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { + private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model, + Map<String, String> contextMap) { MDC.setContextMap(contextMap); logger.error("File publishing failed: {}", model); Path internalFileName = model.getInternalLocation(); @@ -198,7 +200,7 @@ public class ScheduledTasks { Map<String, String> contextMap = MDC.getCopyOfContextMap(); return createConsumerTask() // - .execute() // + .getMessageRouterResponse() // .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap)); } @@ -215,8 +217,7 @@ public class ScheduledTasks { try { Files.delete(localFile); } catch (Exception e) { - logger.trace("Could not delete file: {}", localFile); + logger.trace("Could not delete file: {}", localFile, e); } } - } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java new file mode 100644 index 00000000..de07461c --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/web/PublishRedirectStrategy.java @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.web; + +import java.net.URI; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.ProtocolException; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.protocol.HttpContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PublishRedirectStrategy implementation + * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests. + * This strategy relaxes restrictions on automatic redirection of + * POST methods imposed by the HTTP specification. + * + */ +@Contract(threading = ThreadingBehavior.IMMUTABLE) +public class PublishRedirectStrategy extends DefaultRedirectStrategy { + + public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy(); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** + * Redirectable methods. + */ + private static final String[] REDIRECT_METHODS = new String[] { // + HttpPut.METHOD_NAME, // + HttpGet.METHOD_NAME, // + HttpPost.METHOD_NAME, // + HttpHead.METHOD_NAME, // + HttpDelete.METHOD_NAME // + }; + + @Override + protected boolean isRedirectable(final String method) { + for (final String m : REDIRECT_METHODS) { + if (m.equalsIgnoreCase(method)) { + return true; + } + } + return false; + } + + @Override + public HttpUriRequest getRedirect(final HttpRequest request, final HttpResponse response, final HttpContext context) + throws ProtocolException { + final URI uri = getLocationURI(request, response, context); + logger.trace("getRedirect...: {}", request); + return RequestBuilder.copy(request).setUri(uri).build(); + } + +} diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json index d864c11d..8d45bc84 100644 --- a/datafile-app-server/src/main/resources/datafile_endpoints.json +++ b/datafile-app-server/src/main/resources/datafile_endpoints.json @@ -28,8 +28,8 @@ "ftpesConfiguration": { "keyCert": "config/dfc.jks", "keyPassword": "secret", - "trustedCA": "config/ftp.jks", - "trustedCAPassword": "secret" + "trustedCa": "config/ftp.jks", + "trustedCaPassword": "secret" } } } |