diff options
17 files changed, 442 insertions, 140 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml index ca3160c7..cef185c6 100644 --- a/datafile-app-server/config/application.yaml +++ b/datafile-app-server/config/application.yaml @@ -14,7 +14,7 @@ logging: ROOT: ERROR org.springframework: ERROR org.springframework.data: ERROR - org.onap.dcaegen2.collectors.datafile: INFO + org.onap.dcaegen2.collectors.datafile: ERROR file: opt/log/application.log app: filepath: config/datafile_endpoints.json diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json index 102537b1..5664bdeb 100644 --- a/datafile-app-server/config/datafile_endpoints.json +++ b/datafile-app-server/config/datafile_endpoints.json @@ -3,24 +3,24 @@ "dmaap": { "dmaapConsumerConfiguration": { "dmaapHostName": "localhost", - "dmaapPortNumber": 3904, + "dmaapPortNumber": 2222, "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", "dmaapProtocol": "http", "dmaapUserName": "admin", "dmaapUserPassword": "admin", "dmaapContentType": "application/json", - "consumerId": "c12", + "consumerId": "C12", "consumerGroup": "OpenDcae-c12", "timeoutMS": -1, "messageLimit": 1 }, "dmaapProducerConfiguration": { "dmaapHostName": "localhost", - "dmaapPortNumber": 3905, + "dmaapPortNumber": 3907, "dmaapTopicName": "publish", - "dmaapProtocol": "http", - "dmaapUserName": "admin", - "dmaapUserPassword": "admin", + "dmaapProtocol": "https", + "dmaapUserName": "dradmin", + "dmaapUserPassword": "dradmin", "dmaapContentType": "application/octet-stream" } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java index 0b81df5b..8508cd12 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -18,9 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; @@ -30,15 +28,9 @@ import reactor.core.publisher.Flux; */ abstract class DmaapPublisherTask { - abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel); - - abstract DmaapProducerReactiveHttpClient resolveClient(); - protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); + protected abstract DmaapProducerReactiveHttpClient resolveClient(); - WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } + protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index b4ee3a9d..201b33d1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -37,7 +37,6 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); private final Config datafileAppConfig; - private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; @Autowired public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) { @@ -45,16 +44,10 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) { - logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); - } - - @Override public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { - dmaapProducerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", consumerDmaapModel); - return publish(consumerDmaapModel); + DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); + return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); } @Override @@ -63,8 +56,8 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - DmaapProducerReactiveHttpClient resolveClient() { - return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); + protected DmaapProducerReactiveHttpClient resolveClient() { + return new DmaapProducerReactiveHttpClient(resolveConfiguration()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index c263c95c..171dd024 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -61,7 +61,7 @@ public class ScheduledTasks { logger.trace("Execution of tasks was registered"); consumeFromDmaapMessage() - .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) + .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP")) .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration) .subscribe(this::onSuccess, this::onError, this::onComplete); } diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json index 188129e8..dff77d2f 100644 --- a/datafile-app-server/src/main/resources/datafile_endpoints.json +++ b/datafile-app-server/src/main/resources/datafile_endpoints.json @@ -3,10 +3,10 @@ "dmaap": { "dmaapConsumerConfiguration": { "consumerGroup": "notification", - "consumerId": "1", + "consumerId": "C12", "dmaapContentType": "application/json", "dmaapHostName": "localhost", - "dmaapPortNumber": 3904, + "dmaapPortNumber": 2222, "dmaapProtocol": "http", "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", "dmaapUserName": "admin", @@ -17,11 +17,11 @@ "dmaapProducerConfiguration": { "dmaapContentType": "application/octet-stream", "dmaapHostName": "localhost", - "dmaapPortNumber": 3905, - "dmaapProtocol": "http", + "dmaapPortNumber": 3907, + "dmaapProtocol": "https", "dmaapTopicName": "publish", - "dmaapUserName": "admin", - "dmaapUserPassword": "admin" + "dmaapUserName": "dradmin", + "dmaapUserPassword": "dradmin" } } } diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json index 188129e8..dff77d2f 100644 --- a/datafile-app-server/src/test/resources/datafile_endpoints.json +++ b/datafile-app-server/src/test/resources/datafile_endpoints.json @@ -3,10 +3,10 @@ "dmaap": { "dmaapConsumerConfiguration": { "consumerGroup": "notification", - "consumerId": "1", + "consumerId": "C12", "dmaapContentType": "application/json", "dmaapHostName": "localhost", - "dmaapPortNumber": 3904, + "dmaapPortNumber": 2222, "dmaapProtocol": "http", "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT", "dmaapUserName": "admin", @@ -17,11 +17,11 @@ "dmaapProducerConfiguration": { "dmaapContentType": "application/octet-stream", "dmaapHostName": "localhost", - "dmaapPortNumber": 3905, - "dmaapProtocol": "http", + "dmaapPortNumber": 3907, + "dmaapProtocol": "https", "dmaapTopicName": "publish", - "dmaapUserName": "admin", - "dmaapUserPassword": "admin" + "dmaapUserName": "dradmin", + "dmaapUserPassword": "dradmin" } } } diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml index 96e0988c..9f60f8da 100644 --- a/datafile-dmaap-client/pom.xml +++ b/datafile-dmaap-client/pom.xml @@ -68,6 +68,10 @@ <artifactId>spring-boot-starter-reactor-netty</artifactId> </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java index d5878b0d..095ba8c9 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java @@ -18,8 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.service; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; - import org.onap.dcaegen2.collectors.datafile.config.DmaapCustomConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +35,6 @@ public class DmaapReactiveWebClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private String dmaaPContentType; - private String dmaaPUserName; - private String dmaaPUserPassword; /** * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. @@ -47,8 +43,6 @@ public class DmaapReactiveWebClient { * @return DmaapReactiveWebClient */ public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { - this.dmaaPUserName = dmaapCustomConfig.dmaapUserName(); - this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword(); this.dmaaPContentType = dmaapCustomConfig.dmaapContentType(); return this; } @@ -61,7 +55,6 @@ public class DmaapReactiveWebClient { public WebClient build() { return WebClient.builder() .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType) - .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword)) .filter(logRequest()) .filter(logResponse()) .build(); diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 36050ff7..fae86a82 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -19,26 +19,29 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; import com.google.gson.JsonElement; import com.google.gson.JsonParser; -import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; -import org.apache.http.HttpHeaders; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.io.FileSystemResource; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -49,16 +52,21 @@ public class DmaapProducerReactiveHttpClient { private static final String X_ATT_DR_META = "X-ATT-DR-META"; private static final String NAME_JSON_TAG = "name"; private static final String LOCATION_JSON_TAG = "location"; + private static final String URI_SEPARATOR = "/"; private static final String DEFAULT_FEED_ID = "1"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private WebClient webClient; private final String dmaapHostName; private final Integer dmaapPortNumber; private final String dmaapTopicName; private final String dmaapProtocol; private final String dmaapContentType; + private final String user; + private final String pwd; + + private IFileSystemResource fileResource; + private IRestTemplate restTemplate; /** * Constructor DmaapProducerReactiveHttpClient. @@ -72,65 +80,86 @@ public class DmaapProducerReactiveHttpClient { this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); + this.user = dmaapPublisherConfiguration.dmaapUserName(); + this.pwd = dmaapPublisherConfiguration.dmaapUserPassword(); } /** - * Function for calling DMaaP HTTP producer - post request to DMaaP. + * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter. * - * @param consumerDmaapModel - object which will be sent to DMaaP + * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter * @return status code of operation */ public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); + try { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.parseMediaType(dmaapContentType)); + addMetaDataToHead(consumerDmaapModel, headers); - RequestBodyUriSpec post = webClient.post(); + addUserCredentialsToHead(headers); - prepareHead(consumerDmaapModel, post); + HttpEntity<byte[]> request = addFileToRequest(consumerDmaapModel, headers); - prepareBody(consumerDmaapModel, post); - ResponseSpec responseSpec = post.retrieve(); - responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); - responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); - Flux<String> response = responseSpec.bodyToFlux(String.class); + logger.trace("Starting to publish to DR"); + ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()), + HttpMethod.PUT, request, String.class); - logger.trace("Exiting getDmaapProducerResponse with {}", response); - return response; + return Flux.just(responseEntity.getStatusCode().toString()); + } catch (Exception e) { + logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e); + return Flux.empty(); + } } - public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { - this.webClient = webClient; - return this; + private void addUserCredentialsToHead(HttpHeaders headers) { + String plainCreds = user + ":" + pwd; + byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); + byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); + String base64Creds = new String(base64CredsBytes); + logger.trace("base64Creds...: {}", base64Creds); + headers.add("Authorization", "Basic " + base64Creds); } - private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { - post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); - - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) { + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); + metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); - post.header(X_ATT_DR_META, metaData.toString()); + headers.set(X_ATT_DR_META, metaData.toString()); + } - post.uri(getUri(name)); + private HttpEntity<byte[]> addFileToRequest(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) + throws IOException { + InputStream in = getInputStream(consumerDmaapModel.getLocation()); + return new HttpEntity<>(IOUtils.toByteArray(in), headers); } - private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { - String fileLocation = model.getLocation(); - File fileResource = new File(fileLocation); - FileSystemResource httpResource = new FileSystemResource(fileResource); - post.body(BodyInserters.fromResource(httpResource)); + private InputStream getInputStream(String filePath) throws IOException { + if (fileResource == null) { + fileResource = new FileSystemResourceWrapper(filePath); + } + return fileResource.getInputStream(); + } + + private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException { + if (restTemplate == null) { + restTemplate = new RestTemplateWrapper(); + } + return restTemplate; } private URI getUri(String fileName) { - String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; + String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName; return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) .path(path).build(); } - private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { - String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString(); - logger.error(errorMessage); + protected void setFileSystemResource(IFileSystemResource fileSystemResource) { + fileResource = fileSystemResource; + } - return Mono.error(new Exception(errorMessage)); + protected void setRestTemplate(IRestTemplate restTemplate) { + this.restTemplate = restTemplate; } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java new file mode 100644 index 00000000..63496916 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/FileSystemResourceWrapper.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.io.IOException; +import java.io.InputStream; + +import org.springframework.core.io.FileSystemResource; + +/** + * @author + * + */ +public class FileSystemResourceWrapper implements IFileSystemResource { + private FileSystemResource realResource; + + public FileSystemResourceWrapper(String path) { + realResource = new FileSystemResource(path); + } + @Override + public InputStream getInputStream() throws IOException { + return realResource.getInputStream(); + } +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java new file mode 100644 index 00000000..6ecb1c6e --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IFileSystemResource.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.io.IOException; +import java.io.InputStream; + +/** + * @author + * + */ +public interface IFileSystemResource { + + InputStream getInputStream() throws IOException; +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java new file mode 100644 index 00000000..7f8d3b56 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/IRestTemplate.java @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.net.URI; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; + +/** + * @author + * + */ +public interface IRestTemplate { + public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity, + Class<String> responseType); +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java new file mode 100644 index 00000000..ff3a03bf --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/PublishRedirectStrategy.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.net.URI; + +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.ProtocolException; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.protocol.HttpContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PublishRedirectStrategy implementation + * that automatically redirects all HEAD, GET, POST, PUT, and DELETE requests. + * This strategy relaxes restrictions on automatic redirection of + * POST methods imposed by the HTTP specification. + * + */ +@Contract(threading = ThreadingBehavior.IMMUTABLE) +public class PublishRedirectStrategy extends DefaultRedirectStrategy { + + public static final PublishRedirectStrategy INSTANCE = new PublishRedirectStrategy(); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** + * Redirectable methods. + */ + private static final String[] REDIRECT_METHODS = new String[] { + HttpPut.METHOD_NAME, + HttpGet.METHOD_NAME, + HttpPost.METHOD_NAME, + HttpHead.METHOD_NAME, + HttpDelete.METHOD_NAME + }; + + @Override + protected boolean isRedirectable(final String method) { + for (final String m: REDIRECT_METHODS) { + if (m.equalsIgnoreCase(method)) { + return true; + } + } + return false; + } + + @Override + public HttpUriRequest getRedirect( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws ProtocolException { + final URI uri = getLocationURI(request, response, context); + logger.trace("getRedirect...: {}", request.toString()); + return RequestBuilder.copy(request).setUri(uri).build(); + } + +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java new file mode 100644 index 00000000..bf40700e --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RequestResponseLoggingInterceptor.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.service.producer; +import java.io.IOException; +import java.nio.charset.Charset; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpRequest; +import org.springframework.http.client.ClientHttpRequestExecution; +import org.springframework.http.client.ClientHttpRequestInterceptor; +import org.springframework.http.client.ClientHttpResponse; +import org.springframework.util.StreamUtils; + +public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Override + public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { + logRequest(request, body); + ClientHttpResponse response = execution.execute(request, body); + logResponse(response); + return response; + } + + private void logRequest(HttpRequest request, byte[] body) throws IOException { + if (log.isDebugEnabled()) { + log.debug("===========================request begin================================================"); + log.debug("URI : {}", request.getURI()); + log.debug("Method : {}", request.getMethod()); + log.debug("Headers : {}", request.getHeaders()); + log.debug("Request body: {}", new String(body, "UTF-8")); + log.debug("==========================request end================================================"); + } + } + + private void logResponse(ClientHttpResponse response) throws IOException { + if (log.isDebugEnabled()) { + log.debug("============================response begin=========================================="); + log.debug("Status code : {}", response.getStatusCode()); + log.debug("Status text : {}", response.getStatusText()); + log.debug("Headers : {}", response.getHeaders()); + log.debug("Response body: {}", StreamUtils.copyToString(response.getBody(), Charset.defaultCharset())); + log.debug("=======================response end================================================="); + } + } +}
\ No newline at end of file diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java new file mode 100644 index 00000000..298f6693 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/RestTemplateWrapper.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service.producer; + +import java.net.URI; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.Collections; + +import javax.net.ssl.SSLContext; + +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContextBuilder; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * @author + * + */ +public class RestTemplateWrapper implements IRestTemplate { + private RestTemplate restTemplate; + + public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + SSLContext sslContext = + new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); + CloseableHttpClient httpClient = + HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()) + .setRedirectStrategy(new PublishRedirectStrategy()).build(); + + HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(); + requestFactory.setHttpClient(httpClient); + + restTemplate = new RestTemplate(requestFactory); + restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor())); + + } + + @Override + public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity, + Class<String> responseType) { + return restTemplate.exchange(url, method, requestEntity, responseType); + } + +} diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index 5f4c1a58..5dbc9089 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -17,32 +17,26 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; - -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; -import java.util.ArrayList; -import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModelForUnitTest; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.util.DefaultUriBuilderFactory; -import reactor.core.publisher.Flux; import reactor.test.StepVerifier; /** @@ -57,70 +51,59 @@ class DmaapProducerReactiveHttpClientTest { private static final String X_ATT_DR_META = "X-ATT-DR-META"; private static final String HOST = "54.45.33.2"; - private static final String HTTP_SCHEME = "http"; + private static final String HTTPS_SCHEME = "https"; private static final int PORT = 1234; private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + private static final String URI_SEPARATOR = "/"; private static final String PUBLISH_TOPIC = "publish"; private static final String DEFAULT_FEED_ID = "1"; + private static final String FILE_CONTENT = "Just a string."; private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class); private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest(); - private WebClient webClientMock = mock(WebClient.class); - private RequestBodyUriSpec requestBodyUriSpecMock; - private ResponseSpec responseSpecMock; + + private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class); + private IRestTemplate restTemplateMock = mock(IRestTemplate.class); + private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class); @BeforeEach void setUp() { when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTP_SCHEME); + when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); - when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("DATAFILE"); - when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("DATAFILE"); + when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin"); + when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin"); when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE); when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC); dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock); - - webClientMock = spy(WebClient.builder() - .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType()) - .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(), - dmaapPublisherConfigurationMock.dmaapUserPassword())) - .build()); - requestBodyUriSpecMock = mock(RequestBodyUriSpec.class); - responseSpecMock = mock(ResponseSpec.class); + dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock); + dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock); } @Test - void getHttpResponse_Success() { + void getHttpResponse_Success() throws Exception { mockWebClientDependantObject(); - dmaapProducerReactiveHttpClient.createDmaapWebClient(webClientMock); - List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>(); - consumerDmaapModelList.add(consumerDmaapModel); StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) - .expectNext("200").verifyComplete(); - - verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); - metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); - metaData.getAsJsonObject().remove(NAME_JSON_TAG); - verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString()); - URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT) - .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build(); - verify(requestBodyUriSpecMock).uri(expectedUri); - verify(requestBodyUriSpecMock).body(any()); + .expectNext(HttpStatus.OK.toString()).verifyComplete(); + + URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT) + .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build(); + + verify(restTemplateMock) + .exchange(eq(expectedUri), eq(HttpMethod.PUT), any(), eq(String.class)); } - private void mockWebClientDependantObject() { - when(webClientMock.post()).thenReturn(requestBodyUriSpecMock); - when(requestBodyUriSpecMock.uri((URI) any())).thenReturn(requestBodyUriSpecMock); + private void mockWebClientDependantObject() throws IOException { + InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); + when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream); - when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock); - when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock); - Flux<String> expectedResult = Flux.just("200"); - when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult); + when(restTemplateMock.exchange(any(), any(), any(), any())) + .thenReturn(responseEntityMock); + when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK); } } |