diff options
author | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-08-17 12:34:58 +0200 |
---|---|---|
committer | elinuxhenrik <henrik.b.andersson@est.tech> | 2018-08-20 10:43:58 +0200 |
commit | d661dbcf431f0f02ecf98f748e3516ba0ab23dff (patch) | |
tree | eae40a0aa43e2f4c0b718ac181d7aec462f00697 /datafile-app-server/src/main/java | |
parent | 604024401a3f7b142880970b06d91888086feac7 (diff) |
Add seed code.
First version based on PRH micro service.
Change-Id: Iea1673a8a1961006b1ea98ef245e213e3652eb82
Issue-ID: DCAEGEN2-638
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
23 files changed, 1694 insertions, 0 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java new file mode 100644 index 00000000..63fbccb0 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + */ +@SpringBootApplication +@Configuration +@ComponentScan +@EnableAutoConfiguration(exclude = {JacksonAutoConfiguration.class}) +@EnableScheduling +public class MainApp { + + public static void main(String[] args) { + SpringApplication.run(MainApp.class, args); + } + + @Bean + TaskScheduler taskScheduler() { + return new ConcurrentTaskScheduler(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java new file mode 100644 index 00000000..1fd50c94 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -0,0 +1,212 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.util.Optional; + +import java.util.function.Predicate; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableAaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 + */ + +@Component +@Configuration +public class AppConfig extends DatafileAppConfig { + + private static Predicate<String> isEmpty = String::isEmpty; + @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}") + public String consumerDmaapHostName; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapPortNumber:}") + public Integer consumerDmaapPortNumber; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapTopicName:}") + public String consumerDmaapTopicName; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapProtocol:}") + public String consumerDmaapProtocol; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserName:}") + public String consumerDmaapUserName; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserPassword:}") + public String consumerDmaapUserPassword; + + @Value("${dmaap.dmaapConsumerConfiguration.dmaapContentType:}") + public String consumerDmaapContentType; + + @Value("${dmaap.dmaapConsumerConfiguration.consumerId:}") + public String consumerId; + + @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}") + public String consumerGroup; + + @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}") + public Integer consumerTimeoutMs; + + @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}") + public Integer consumerMessageLimit; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapHostName:}") + public String producerDmaapHostName; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapPortNumber:}") + public Integer producerDmaapPortNumber; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapTopicName:}") + public String producerDmaapTopicName; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapProtocol:}") + public String producerDmaapProtocol; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapUserName:}") + public String producerDmaapUserName; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapUserPassword:}") + public String producerDmaapUserPassword; + + @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}") + public String producerDmaapContentType; + + @Value("${aai.aaiClientConfiguration.aaiHost:}") + public String aaiHost; + + @Value("${aai.aaiClientConfiguration.aaiHostPortNumber:}") + public Integer aaiPort; + + @Value("${aai.aaiClientConfiguration.aaiProtocol:}") + public String aaiProtocol; + + @Value("${aai.aaiClientConfiguration.aaiUserName:}") + public String aaiUserName; + + @Value("${aai.aaiClientConfiguration.aaiUserPassword:}") + public String aaiUserPassword; + + @Value("${aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors:}") + public Boolean aaiIgnoreSslCertificateErrors; + + @Value("${aai.aaiClientConfiguration.aaiBasePath:}") + public String aaiBasePath; + + @Value("${aai.aaiClientConfiguration.aaiPnfPath:}") + public String aaiPnfPath; + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return new ImmutableDmaapConsumerConfiguration.Builder() + .dmaapUserPassword( + Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapUserPassword())) + .dmaapUserName( + Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapUserName())) + .dmaapHostName( + Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapHostName())) + .dmaapPortNumber( + Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapConsumerConfiguration.dmaapPortNumber())) + .dmaapProtocol( + Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapProtocol())) + .dmaapContentType( + Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapContentType())) + .dmaapTopicName( + Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.dmaapTopicName())) + .messageLimit( + Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapConsumerConfiguration.messageLimit())) + .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapConsumerConfiguration.timeoutMs())) + .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.consumerGroup())) + .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate()) + .orElse(dmaapConsumerConfiguration.consumerId())) + .build(); + } + + @Override + public AaiClientConfiguration getAaiClientConfiguration() { + return new ImmutableAaiClientConfiguration.Builder() + .aaiHost(Optional.ofNullable(aaiHost).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiHost())) + .aaiPort( + Optional.ofNullable(aaiPort).filter(p -> !p.toString().isEmpty()) + .orElse(aaiClientConfiguration.aaiPort())) + .aaiIgnoreSslCertificateErrors( + Optional.ofNullable(aaiIgnoreSslCertificateErrors).filter(p -> !p.toString().isEmpty()) + .orElse(aaiClientConfiguration.aaiIgnoreSslCertificateErrors())) + .aaiProtocol( + Optional.ofNullable(aaiProtocol).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiProtocol())) + .aaiUserName( + Optional.ofNullable(aaiUserName).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiUserName())) + .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(isEmpty.negate()) + .orElse(aaiClientConfiguration.aaiUserPassword())) + .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(isEmpty.negate()) + .orElse(aaiClientConfiguration.aaiBasePath())) + .aaiPnfPath( + Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath())) + .aaiHeaders(aaiClientConfiguration.aaiHeaders()) + .build(); + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapContentType( + Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapContentType())) + .dmaapHostName( + Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapHostName())) + .dmaapPortNumber( + Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapPublisherConfiguration.dmaapPortNumber())) + .dmaapProtocol( + Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapProtocol())) + .dmaapTopicName( + Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapTopicName())) + .dmaapUserName( + Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapUserName())) + .dmaapUserPassword( + Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate()) + .orElse(dmaapPublisherConfiguration.dmaapUserPassword())) + .build(); + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java new file mode 100644 index 00000000..5c6f1512 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/25/18 + */ +public interface Config { + + DmaapConsumerConfiguration getDmaapConsumerConfiguration(); + + AaiClientConfiguration getAaiClientConfiguration(); + + DmaapPublisherConfiguration getDmaapPublisherConfiguration(); + + void initFileStreamReader(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java new file mode 100644 index 00000000..169bf8ed --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java @@ -0,0 +1,143 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import java.nio.charset.StandardCharsets; +import java.util.ServiceLoader; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 + */ +@Configuration +@EnableConfigurationProperties +@ConfigurationProperties("app") +public abstract class DatafileAppConfig implements Config { + + private static final String CONFIG = "configs"; + private static final String AAI = "aai"; + private static final String DMAAP = "dmaap"; + private static final String AAI_CONFIG = "aaiClientConfiguration"; + private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; + private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + AaiClientConfiguration aaiClientConfiguration; + + DmaapConsumerConfiguration dmaapConsumerConfiguration; + + DmaapPublisherConfiguration dmaapPublisherConfiguration; + + @NotEmpty + private String filepath; + + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return dmaapConsumerConfiguration; + } + + @Override + public AaiClientConfiguration getAaiClientConfiguration() { + return aaiClientConfiguration; + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return dmaapPublisherConfiguration; + } + + @Override + public void initFileStreamReader() { + + GsonBuilder gsonBuilder = new GsonBuilder(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + JsonParser parser = new JsonParser(); + JsonObject jsonObject; + try (InputStream inputStream = getInputStream(filepath)) { + JsonElement rootElement = getJsonElement(parser, inputStream); + if (rootElement.isJsonObject()) { + jsonObject = rootElement.getAsJsonObject(); + aaiClientConfiguration = deserializeType(gsonBuilder, + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG), + AaiClientConfiguration.class); + + dmaapConsumerConfiguration = deserializeType(gsonBuilder, + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), + DmaapConsumerConfiguration.class); + + dmaapPublisherConfiguration = deserializeType(gsonBuilder, + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER), + DmaapPublisherConfiguration.class); + } + } catch (IOException e) { + logger.warn("Problem with file loading, file: {}", filepath, e); + } catch (JsonSyntaxException e) { + logger.warn("Problem with Json deserialization", e); + } + } + + JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { + return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + } + + private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, + @NotNull Class<T> type) { + return gsonBuilder.create().fromJson(jsonObject, type); + } + + InputStream getInputStream(@NotNull String filepath) throws IOException { + return new BufferedInputStream(new FileInputStream(filepath)); + } + + String getFilepath() { + return this.filepath; + } + + public void setFilepath(String filepath) { + this.filepath = filepath; + } + +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java new file mode 100644 index 00000000..823fe732 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import io.swagger.annotations.ApiOperation; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import javax.annotation.PostConstruct; + +import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 + */ +@Configuration +@EnableScheduling +public class SchedulerConfig extends DatafileAppConfig { + + private static final int SCHEDULING_DELAY = 2000; + private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>(); + + private final TaskScheduler taskScheduler; + private final ScheduledTasks scheduledTask; + + @Autowired + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { + this.taskScheduler = taskScheduler; + this.scheduledTask = scheduledTask; + } + + /** + * Function which have to stop tasks execution. + * + * @return response entity about status of cancellation operation + */ + @ApiOperation(value = "Get response on stopping task execution") + public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { + scheduledFutureList.forEach(x -> x.cancel(false)); + scheduledFutureList.clear(); + return Mono.defer(() -> + Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)) + ); + } + + /** + * Function for starting scheduling Datafile workflow. + * + * @return status of operation execution: true - started, false - not started + */ + @PostConstruct + @ApiOperation(value = "Start task if possible") + public synchronized boolean tryToStartTask() { + if (scheduledFutureList.isEmpty()) { + scheduledFutureList.add(taskScheduler + .scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, SCHEDULING_DELAY)); + return true; + } else { + return false; + } + + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java new file mode 100644 index 00000000..c45b136a --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + + +@EnableSwagger2 +@Configuration +@Profile("prod") +public class SwaggerConfig extends WebMvcConfigurationSupport { + + private static final String PACKAGE_PATH = "org.onap.dcaegen2.collectors.datafile"; + private static final String API_TITLE = "Datafile app server"; + private static final String DESCRIPTION = "This page lists all the rest apis for Datafile app server."; + private static final String VERSION = "1.0"; + private static final String RESOURCES_PATH = "classpath:/META-INF/resources/"; + private static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/"; + private static final String SWAGGER_UI = "swagger-ui.html"; + private static final String WEBJARS = "/webjars/**"; + + /** + * Swagger configuration function for hosting it next to spring http website. + * @return Docket + */ + @Bean + public Docket api() { + return new Docket(DocumentationType.SWAGGER_2) + .apiInfo(apiInfo()) + .select() + .apis(RequestHandlerSelectors.basePackage(PACKAGE_PATH)) + .paths(PathSelectors.any()) + .build(); + } + + private ApiInfo apiInfo() { + return new ApiInfoBuilder() + .title(API_TITLE) + .description(DESCRIPTION) + .version(VERSION) + .build(); + } + + + @Override + protected void addResourceHandlers(ResourceHandlerRegistry registry) { + registry.addResourceHandler(SWAGGER_UI) + .addResourceLocations(RESOURCES_PATH); + + registry.addResourceHandler(WEBJARS) + .addResourceLocations(WEBJARS_PATH); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java new file mode 100644 index 00000000..b6231418 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/TomcatHttpConfig.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import org.apache.catalina.connector.Connector; +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; +import org.springframework.boot.web.servlet.server.ServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/18/18 + */ +@Configuration +public class TomcatHttpConfig { + + /** + * Class for setting up hosting Datafile on http/https. + * + * @return ServletWebServerFactory + */ + @Bean + public ServletWebServerFactory servletContainer() { + TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); + tomcat.addAdditionalTomcatConnectors(getHttpConnector()); + return tomcat; + } + + private Connector getHttpConnector() { + Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL); + connector.setScheme("http"); + connector.setPort(8100); + connector.setSecure(false); + return connector; + } + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java new file mode 100644 index 00000000..070fe591 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/HeartbeatController.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.controllers; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 + */ +@RestController +@Api(value = "HeartbeatController", description = "Check liveness of Datafile service") +public class HeartbeatController { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** + * Endpoint for checking that Datafile is alive. + * + * @return HTTP Status Code + */ + @RequestMapping(value = "heartbeat", method = RequestMethod.GET) + @ApiOperation(value = "Returns liveness of Datafile service") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Datafile sevice is living"), + @ApiResponse(code = 401, message = "You are not authorized to view the resource"), + @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), + @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") + } + ) + public Mono<ResponseEntity<String>> heartbeat() { + logger.trace("Receiving heartbeat request"); + return Mono.defer(() -> + Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) + ); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java new file mode 100644 index 00000000..f3cf354f --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.controllers; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; + +import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/5/18 + */ +@RestController +@Api(value = "ScheduleController", description = "Schedule Controller") +public class ScheduleController { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final SchedulerConfig schedulerConfig; + + @Autowired + public ScheduleController(SchedulerConfig schedulerConfig) { + this.schedulerConfig = schedulerConfig; + } + + @RequestMapping(value = "start", method = RequestMethod.GET) + @ApiOperation(value = "Start scheduling worker request") + public Mono<ResponseEntity<String>> startTasks() { + logger.trace("Receiving start scheduling worker request"); + return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse); + } + + @RequestMapping(value = "stopDatafile", method = RequestMethod.GET) + @ApiOperation(value = "Receiving stop scheduling worker request") + public Mono<ResponseEntity<String>> stopTask() { + logger.trace("Receiving stop scheduling worker request"); + return schedulerConfig.getResponseFromCancellationOfTasks(); + } + + @ApiOperation(value = "Sends success or error response on starting task execution") + private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { + if (wasScheduled) { + return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED); + } else { + return new ResponseEntity<>("Datafile Service is still running!", HttpStatus.NOT_ACCEPTABLE); + } + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/AaiNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/AaiNotFoundException.java new file mode 100644 index 00000000..a83b5bd6 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/AaiNotFoundException.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + */ +public class AaiNotFoundException extends DatafileTaskException { + + public AaiNotFoundException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java new file mode 100644 index 00000000..41f77332 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +public class DatafileTaskException extends Exception { + + public DatafileTaskException() { + super(); + } + + public DatafileTaskException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java new file mode 100644 index 00000000..d9f6f873 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 + */ +public class DmaapEmptyResponseException extends DatafileTaskException { + + public DmaapEmptyResponseException() { + super(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java new file mode 100644 index 00000000..ebff8ae3 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +public class DmaapNotFoundException extends DatafileTaskException { + + public DmaapNotFoundException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java new file mode 100644 index 00000000..aeaf0da1 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -0,0 +1,130 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 + */ +public class DmaapConsumerJsonParser { + + private static final String EVENT = "event"; + private static final String OTHER_FIELDS = "otherFields"; + private static final String PNF_OAM_IPV_4_ADDRESS = "pnfOamIpv4Address"; + private static final String PNF_OAM_IPV_6_ADDRESS = "pnfOamIpv6Address"; + private static final String PNF_VENDOR_NAME = "pnfVendorName"; + private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber"; + + /** + * Extract info from string and create @see {@link org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel}. + * + * @param monoMessage - results from DMaaP + * @return reactive DMaaPModel + */ + public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { + return monoMessage + .flatMap(this::getJsonParserMessage) + .flatMap(this::createJsonConsumerModel); + } + + private Mono<JsonElement> getJsonParserMessage(String message) { + return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) + : Mono.fromSupplier(() -> new JsonParser().parse(message)); + } + + private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { + return jsonElement.isJsonObject() + ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + : getConsumerDmaapModelFromJsonArray(jsonElement); + } + + private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + return create( + Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() + .flatMap(this::getJsonObjectFromAnArray) + .orElseThrow(DmaapEmptyResponseException::new))); + } + + public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + } + + private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) { + return jsonObject.flatMap(monoJsonP -> + !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); + } + + private Mono<ConsumerDmaapModel> transform(JsonObject monoJsonP) { + monoJsonP = monoJsonP.getAsJsonObject(EVENT).getAsJsonObject(OTHER_FIELDS); + String pnfVendorName = getValueFromJson(monoJsonP, PNF_VENDOR_NAME); + String pnfSerialNumber = getValueFromJson(monoJsonP, PNF_SERIAL_NUMBER); + String pnfOamIpv4Address = getValueFromJson(monoJsonP, PNF_OAM_IPV_4_ADDRESS); + String pnfOamIpv6Address = getValueFromJson(monoJsonP, PNF_OAM_IPV_6_ADDRESS); + return + (!vendorAndSerialNotEmpty(pnfSerialNumber, pnfVendorName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, + pnfOamIpv6Address)) + ? Mono.error(new DmaapNotFoundException("Incorrect json, consumerDmaapModel can not be created: " + + printMessage(pnfVendorName, pnfSerialNumber, pnfOamIpv4Address, pnfOamIpv6Address))) : + Mono.just(ImmutableConsumerDmaapModel.builder() + .pnfName(pnfVendorName.substring(0, Math.min(pnfVendorName.length(), 3)).toUpperCase() + .concat(pnfSerialNumber)).ipv4(pnfOamIpv4Address) + .ipv6(pnfOamIpv6Address).build()); + } + + private String getValueFromJson(JsonObject jsonObject, String jsonKey) { + return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : ""; + } + + private boolean vendorAndSerialNotEmpty(String pnfSerialNumber, String pnfVendorName) { + return (!StringUtils.isEmpty(pnfSerialNumber) && !StringUtils.isEmpty(pnfVendorName)); + } + + private boolean ipPropertiesNotEmpty(String ipv4, String ipv6) { + return (!StringUtils.isEmpty(ipv4)) || !(StringUtils.isEmpty(ipv6)); + } + + private boolean containsHeader(JsonObject jsonObject) { + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS); + } + + private String printMessage(String pnfVendorName, String pnfSerialNumber, String pnfOamIpv4Address, + String pnfOamIpv6Address) { + return String.format("%n{" + + "\"pnfVendorName\" : \"%s\"," + + "\"pnfSerialNumber\": \"%s\"," + + "\"pnfOamIpv4Address\": \"%s\"," + + "\"pnfOamIpv6Address\": \"%s\"" + + "%n}", pnfVendorName, pnfSerialNumber, pnfOamIpv4Address, pnfOamIpv6Address); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java new file mode 100644 index 00000000..8083a255 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTask.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.util.Optional; + +import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.AaiConsumerClient; + +public abstract class AaiConsumerTask { + + abstract Optional<String> consume(ConsumerDmaapModel message) throws AaiNotFoundException; + + abstract AaiConsumerClient resolveClient(); + + protected abstract String execute(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException; +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java new file mode 100644 index 00000000..d487b6b2 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiConsumerTaskImpl.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.io.IOException; +import java.util.Optional; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.AaiConsumerClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class AaiConsumerTaskImpl extends AaiConsumerTask { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final Config datafileAppConfig; + private AaiConsumerClient aaiConsumerClient; + + @Autowired + public AaiConsumerTaskImpl(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + } + + @Override + Optional<String> consume(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException { + logger.trace("Method called with arg {}", consumerDmaapModel); + try { + return aaiConsumerClient.getHttpResponse(consumerDmaapModel); + } catch (IOException e) { + logger.warn("Get request not successful", e); + throw new AaiNotFoundException("Get request not successful"); + } + } + + @Override + public String execute(ConsumerDmaapModel consumerDmaapModel) throws AaiNotFoundException { + consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) + .orElseThrow(() -> new AaiNotFoundException("Invoked null object to AAI task")); + logger.trace("Method called with arg {}", consumerDmaapModel); + aaiConsumerClient = resolveClient(); + return consume(consumerDmaapModel).orElseThrow(() -> new AaiNotFoundException("Null response code")); + } + + protected AaiClientConfiguration resolveConfiguration() { + return datafileAppConfig.getAaiClientConfiguration(); + } + + @Override + AaiConsumerClient resolveClient() { + return Optional.ofNullable(aaiConsumerClient).orElseGet(() -> new AaiConsumerClient(resolveConfiguration())); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java new file mode 100644 index 00000000..ca2d03da --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTask.java @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.AaiReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.producer.AaiProducerReactiveHttpClient; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +public abstract class AaiProducerTask { + + abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException; + + abstract AaiProducerReactiveHttpClient resolveClient(); + + protected abstract AaiClientConfiguration resolveConfiguration(); + + protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) + throws DatafileTaskException; + + WebClient buildWebClient() { + return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java new file mode 100644 index 00000000..9d888aa7 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/AaiProducerTaskImpl.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.AaiClientConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.exceptions.AaiNotFoundException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.utils.HttpUtils; +import org.onap.dcaegen2.collectors.datafile.service.producer.AaiProducerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +@Component +public class AaiProducerTaskImpl extends + AaiProducerTask { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final Config datafileAppConfig; + private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient; + + @Autowired + public AaiProducerTaskImpl(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + } + + @Override + Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { + logger.info("Sending PNF model to AAI {}", consumerDmaapModel); + return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel) + .flatMap(response -> { + if (HttpUtils.isSuccessfulResponseCode(response)) { + return consumerDmaapModel; + } + return Mono + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); + }); + } + + @Override + AaiProducerReactiveHttpClient resolveClient() { + return aaiProducerReactiveHttpClient == null ? new AaiProducerReactiveHttpClient(resolveConfiguration()) + .createAaiWebClient(buildWebClient()) : aaiProducerReactiveHttpClient; + } + + @Override + protected AaiClientConfiguration resolveConfiguration() { + return datafileAppConfig.getAaiClientConfiguration(); + } + + @Override + protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException { + if (consumerDmaapModel == null) { + throw new DmaapNotFoundException("Invoked null object to DMaaP task"); + } + aaiProducerReactiveHttpClient = resolveClient(); + logger.trace("Method called with arg {}", consumerDmaapModel); + return publish(consumerDmaapModel); + + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java new file mode 100644 index 00000000..a5764704 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +abstract class DmaapConsumerTask { + + abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws DatafileTaskException; + + abstract DMaaPConsumerReactiveHttpClient resolveClient(); + + abstract void initConfigs(); + + protected abstract DmaapConsumerConfiguration resolveConfiguration(); + + protected abstract Mono<ConsumerDmaapModel> execute(String object) throws DatafileTaskException; + + WebClient buildWebClient() { + return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java new file mode 100644 index 00000000..8d45a7fd --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.collectors.datafile.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + */ +@Component +public class DmaapConsumerTaskImpl extends DmaapConsumerTask { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Config datafileAppConfig; + private DmaapConsumerJsonParser dmaapConsumerJsonParser; + private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + + @Autowired + public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + } + + DmaapConsumerTaskImpl(AppConfig datafileAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { + this.datafileAppConfig = datafileAppConfig; + this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; + } + + @Override + Mono<ConsumerDmaapModel> consume(Mono<String> message) { + logger.info("Consumed model from DMaaP: {}", message); + return dmaapConsumerJsonParser.getJsonObject(message); + } + + @Override + public Mono<ConsumerDmaapModel> execute(String object) { + dmaaPConsumerReactiveHttpClient = resolveClient(); + logger.trace("Method called with arg {}", object); + return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + } + + @Override + void initConfigs() { + datafileAppConfig.initFileStreamReader(); + } + + @Override + protected DmaapConsumerConfiguration resolveConfiguration() { + return datafileAppConfig.getDmaapConsumerConfiguration(); + } + + @Override + DMaaPConsumerReactiveHttpClient resolveClient() { + return dmaaPConsumerReactiveHttpClient == null + ? new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) + : dmaaPConsumerReactiveHttpClient; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java new file mode 100644 index 00000000..467eee0b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.DMaaPReactiveWebClient; +import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + */ +abstract class DmaapPublisherTask { + + abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException; + + abstract DMaaPProducerReactiveHttpClient resolveClient(); + + protected abstract DmaapPublisherConfiguration resolveConfiguration(); + + protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DatafileTaskException; + + WebClient buildWebClient() { + return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java new file mode 100644 index 00000000..4d435a4f --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.Config; +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.service.producer.DMaaPProducerReactiveHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 + */ +@Component +public class DmaapPublisherTaskImpl extends DmaapPublisherTask { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Config datafileAppConfig; + private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient; + + @Autowired + public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { + this.datafileAppConfig = datafileAppConfig; + } + + @Override + Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { + logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), + consumerDmaapModel); + return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); + } + + @Override + public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + if (consumerDmaapModel == null) { + throw new DmaapNotFoundException("Invoked null object to DMaaP task"); + } + dmaapProducerReactiveHttpClient = resolveClient(); + logger.trace("Method called with arg {}", consumerDmaapModel); + return publish(consumerDmaapModel); + } + + @Override + protected DmaapPublisherConfiguration resolveConfiguration() { + return datafileAppConfig.getDmaapPublisherConfiguration(); + } + + @Override + DMaaPProducerReactiveHttpClient resolveClient() { + return dmaapProducerReactiveHttpClient == null + ? new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) + : dmaapProducerReactiveHttpClient; + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java new file mode 100644 index 00000000..2600b563 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * Datafile Collector Service + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.util.concurrent.Callable; + +import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 + */ +@Component +public class ScheduledTasks { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final DmaapConsumerTask dmaapConsumerTask; + private final DmaapPublisherTask dmaapProducerTask; + private final AaiProducerTask aaiProducerTask; + + /** + * Constructor for tasks registration in DatafileWorkflow. + * + * @param dmaapConsumerTask - fist task + * @param dmaapPublisherTask - third task + * @param aaiPublisherTask - second task + */ + @Autowired + public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask, + AaiProducerTask aaiPublisherTask) { + this.dmaapConsumerTask = dmaapConsumerTask; + this.dmaapProducerTask = dmaapPublisherTask; + this.aaiProducerTask = aaiPublisherTask; + } + + /** + * Main function for scheduling datafileWorkflow. + */ + public void scheduleMainDatafileEventTask() { + logger.trace("Execution of tasks was registered"); + + Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) + .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) + .map(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .subscribeOn(Schedulers.elastic()); + + dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + } + + private void onComplete() { + logger.info("Datafile tasks have been completed"); + } + + private void onSuccess(String responseCode) { + logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode); + } + + private void onError(Throwable throwable) { + if (!(throwable instanceof DmaapEmptyResponseException)) { + logger.warn("Chain of tasks have been aborted due to errors in Datafile workflow", throwable); + } + } + + private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { + return () -> { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); + }; + } + + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + try { + return aaiProducerTask.execute(monoDMaaPModel); + } catch (DatafileTaskException e) { + return Mono.error(e); + } + } + + private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + try { + return dmaapProducerTask.execute(monoAaiModel); + } catch (DatafileTaskException e) { + return Mono.error(e); + } + } +} |