diff options
author | maximesson <maxime.bonneau@est.tech> | 2019-03-21 15:58:55 +0000 |
---|---|---|
committer | maximesson <maxime.bonneau@est.tech> | 2019-03-21 15:58:55 +0000 |
commit | 4bd281390ed24b278846775c1157f82db81fddbe (patch) | |
tree | 1ffaf2384e830e9659e379aab0c833732924ccce /datafile-dmaap-client/src/main/java | |
parent | 6870154043d73d527cc42aca7ade7e49aa961476 (diff) |
Add check to DataRouter if file has been published
For each file in the FileReady message that DFC does not know if it has been
published yet, it should ask DataRouter if it has been published already to
avoid downloading and publishing a file more than once.
Change-Id: I18117a6e968ec929aa255052a4c44f890a8ed39d
Issue-ID: DCAEGEN2-1256
Signed-off-by: maximesson <maxime.bonneau@est.tech>
Diffstat (limited to 'datafile-dmaap-client/src/main/java')
7 files changed, 176 insertions, 169 deletions
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index 1bf3ec5a..4283debf 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -67,7 +67,7 @@ public class FtpsClient implements FileCollectClient { } catch (DatafileTaskException e) { throw e; } catch (Exception e) { - throw new DatafileTaskException("Could not open connection: ", e); + throw new DatafileTaskException("Could not open connection: " + e, e); } } @@ -100,7 +100,7 @@ public class FtpsClient implements FileCollectClient { throw new DatafileTaskException("Could not retrieve file " + remoteFileName); } } catch (IOException e) { - throw new DatafileTaskException("Could not fetch file: ", e); + throw new DatafileTaskException("Could not fetch file: " + e, e); } logger.trace("collectFile fetched: {}", localFileName); } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index 2f489166..dec8af42 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -79,7 +79,7 @@ public class SftpClient implements FileCollectClient { sftpChannel = getChannel(session); } } catch (JSchException e) { - throw new DatafileTaskException("Could not open Sftp client", e); + throw new DatafileTaskException("Could not open Sftp client" + e, e); } } diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java new file mode 100644 index 00000000..e01a941b --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.http; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import org.apache.http.client.RedirectStrategy; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder { + HttpAsyncClientBuilder builder = HttpAsyncClients.custom(); + + @Override + public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) { + builder.setRedirectStrategy(redirectStrategy); + return this; + } + + @Override + public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) { + builder.setSSLContext(sslcontext); + return this; + } + + @Override + public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) { + builder.setSSLHostnameVerifier(hostnameVerifier); + return this; + } + + @Override + public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) { + builder.setDefaultRequestConfig(config); + return this; + } + + @Override + public CloseableHttpAsyncClient build() { + return builder.build(); + } + +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java new file mode 100644 index 00000000..e0a51a80 --- /dev/null +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.http; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import org.apache.http.client.RedirectStrategy; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public interface IHttpAsyncClientBuilder { + public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy); + + public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext); + + public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier); + + public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config); + + public CloseableHttpAsyncClient build(); +} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java deleted file mode 100644 index 5295b124..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import org.springframework.core.io.FileSystemResource; - -public class FileSystemResourceWrapper implements IFileSystemResource { - private FileSystemResource realResource; - - @Override - public void setPath(Path path) { - realResource = new FileSystemResource(path); - } - @Override - public InputStream getInputStream() throws IOException { - return realResource.getInputStream(); - } -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java deleted file mode 100644 index 23f14a33..00000000 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.io; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; - -public interface IFileSystemResource { - - public void setPath(Path filePath); - - public InputStream getInputStream() throws IOException; -} diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 9304688f..9bd5d57f 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -16,54 +16,34 @@ package org.onap.dcaegen2.collectors.datafile.service.producer; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; -import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; - -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Future; import javax.net.ssl.SSLContext; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.ssl.SSLContextBuilder; -import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper; -import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource; -import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; +import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; import org.springframework.web.util.DefaultUriBuilderFactory; - -import reactor.core.publisher.Mono; +import org.springframework.web.util.UriBuilder; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -71,11 +51,7 @@ import reactor.core.publisher.Mono; */ public class DmaapProducerReactiveHttpClient { - private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; - private static final String NAME_JSON_TAG = "name"; - private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation"; - private static final String URI_SEPARATOR = "/"; - private static final String DEFAULT_FEED_ID = "1"; + private static final int NO_REQUEST_TIMEOUT = -1; private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN"); @@ -83,14 +59,10 @@ public class DmaapProducerReactiveHttpClient { private final String dmaapHostName; private final Integer dmaapPortNumber; - private final String dmaapTopicName; private final String dmaapProtocol; - private final String dmaapContentType; private final String user; private final String pwd; - private IFileSystemResource fileResource = new FileSystemResourceWrapper(); - /** * Constructor DmaapProducerReactiveHttpClient. * @@ -99,96 +71,86 @@ public class DmaapProducerReactiveHttpClient { public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName(); this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber(); - this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName(); this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol(); - this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType(); this.user = dmaapPublisherConfiguration.dmaapUserName(); this.pwd = dmaapPublisherConfiguration.dmaapUserPassword(); } - /** - * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter. - * - * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter - * @return status code of operation - */ - public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel, - Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - try (CloseableHttpAsyncClient webClient = createWebClient()) { - - HttpPut put = new HttpPut(); - prepareHead(consumerDmaapModel, put); - prepareBody(consumerDmaapModel, put); - addUserCredentialsToHead(put); - - logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation()); - Future<HttpResponse> future = webClient.execute(put, null); + public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap) + throws DatafileTaskException { + try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) { + MdcVariables.setMdcContextMap(contextMap); + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); HttpResponse response = future.get(); - logger.trace(INVOKE_RETURN, "Response from DR {}", response); - return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); + logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString()); + return response; } catch (Exception e) { - logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); - return Mono.error(e); + throw new DatafileTaskException("Unable to create web client.", e); } } - private void addUserCredentialsToHead(HttpPut put) { + public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout, + Map<String, String> contextMap) throws DatafileTaskException { + try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) { + MdcVariables.setMdcContextMap(contextMap); + webClient.start(); + + logger.trace(INVOKE, "Starting to produce to DR {}", request); + Future<HttpResponse> future = webClient.execute(request, null); + HttpResponse response = future.get(); + logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString()); + return response; + } catch (Exception e) { + throw new DatafileTaskException("Unable to create web client.", e); + } + } + + public void addUserCredentialsToHead(HttpUriRequest request) { String plainCreds = user + ":" + pwd; byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); logger.trace("base64Creds...: {}", base64Creds); - put.addHeader("Authorization", "Basic " + base64Creds); + request.addHeader("Authorization", "Basic " + base64Creds); } - private void prepareHead(ConsumerDmaapModel model, HttpPut put) { - put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType); - JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); - metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); - put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getUri(name)); - - String requestID = MDC.get(REQUEST_ID); - put.addHeader(X_ONAP_REQUEST_ID, requestID); - String invocationID = UUID.randomUUID().toString(); - put.addHeader(X_INVOCATION_ID, invocationID); + public UriBuilder getBaseUri() { + return new DefaultUriBuilderFactory().builder() // + .scheme(dmaapProtocol) // + .host(dmaapHostName) // + .port(dmaapPortNumber); } - private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { - Path fileLocation = Paths.get(model.getInternalLocation()); - this.fileResource.setPath(fileLocation); - InputStream fileInputStream = fileResource.getInputStream(); - - put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout) + throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + SSLContext sslContext = + new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build(); - } + IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder(); + clientBuilder.setSSLContext(sslContext) // + .setSSLHostnameVerifier(new NoopHostnameVerifier()); - private URI getUri(String fileName) { - String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName; - return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) - .path(path).build(); - } + if (expectRedirect) { + clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE); + } - void setFileSystemResource(IFileSystemResource fileSystemResource) { - fileResource = fileSystemResource; - } + if (requestTimeout > NO_REQUEST_TIMEOUT) { + RequestConfig requestConfig = RequestConfig.custom() // + .setSocketTimeout(requestTimeout) // + .setConnectTimeout(requestTimeout) // + .setConnectionRequestTimeout(requestTimeout) // + .build(); - protected CloseableHttpAsyncClient createWebClient() - throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { + clientBuilder.setDefaultRequestConfig(requestConfig); + } - SSLContext sslContext = new SSLContextBuilder() // - .loadTrustMaterial(null, (certificate, authType) -> true) // - .build(); - - CloseableHttpAsyncClient webClient = HttpAsyncClients.custom() // - .setSSLContext(sslContext) // - .setSSLHostnameVerifier(new NoopHostnameVerifier()) // - .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) // - .build(); - webClient.start(); - return webClient; + return clientBuilder.build(); } + IHttpAsyncClientBuilder getHttpClientBuilder() { + return new HttpAsyncClientBuilderWrapper(); + } }
\ No newline at end of file |