summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--datafile-app-server/config/datafile_endpoints.json52
-rw-r--r--datafile-app-server/src/main/resources/datafile_endpoints.json2
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints.json2
-rw-r--r--datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java32
-rw-r--r--datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java31
5 files changed, 55 insertions, 64 deletions
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index 41f9b6d8..102537b1 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -1,28 +1,28 @@
{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3904,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "admin",
- "dmaapUserPassword": "admin",
- "dmaapContentType": "application/json",
- "consumerId": "c12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMS": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3905,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "admin",
- "dmaapUserPassword": "admin",
- "dmaapContentType": "application/octet-stream"
- }
+ "configs": {
+ "dmaap": {
+ "dmaapConsumerConfiguration": {
+ "dmaapHostName": "localhost",
+ "dmaapPortNumber": 3904,
+ "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
+ "dmaapProtocol": "http",
+ "dmaapUserName": "admin",
+ "dmaapUserPassword": "admin",
+ "dmaapContentType": "application/json",
+ "consumerId": "c12",
+ "consumerGroup": "OpenDcae-c12",
+ "timeoutMS": -1,
+ "messageLimit": 1
+ },
+ "dmaapProducerConfiguration": {
+ "dmaapHostName": "localhost",
+ "dmaapPortNumber": 3905,
+ "dmaapTopicName": "publish",
+ "dmaapProtocol": "http",
+ "dmaapUserName": "admin",
+ "dmaapUserPassword": "admin",
+ "dmaapContentType": "application/octet-stream"
+ }
+ }
}
- }
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/resources/datafile_endpoints.json b/datafile-app-server/src/main/resources/datafile_endpoints.json
index f6b65fba..188129e8 100644
--- a/datafile-app-server/src/main/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/main/resources/datafile_endpoints.json
@@ -19,7 +19,7 @@
"dmaapHostName": "localhost",
"dmaapPortNumber": 3905,
"dmaapProtocol": "http",
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
+ "dmaapTopicName": "publish",
"dmaapUserName": "admin",
"dmaapUserPassword": "admin"
}
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints.json b/datafile-app-server/src/test/resources/datafile_endpoints.json
index f6b65fba..188129e8 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints.json
@@ -19,7 +19,7 @@
"dmaapHostName": "localhost",
"dmaapPortNumber": 3905,
"dmaapProtocol": "http",
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
+ "dmaapTopicName": "publish",
"dmaapUserName": "admin",
"dmaapUserPassword": "admin"
}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index fd3c0c84..4b8ce08f 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -21,11 +21,9 @@ import com.google.gson.JsonParser;
import java.io.File;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import org.apache.http.HttpHeaders;
-import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -38,6 +36,7 @@ import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -49,14 +48,15 @@ public class DmaapProducerReactiveHttpClient {
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String LOCATION = "location";
+ private static final String DEFAULT_FEED_ID = "1";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private WebClient webClient;
private final String dmaapHostName;
private final Integer dmaapPortNumber;
- private final String dmaapProtocol;
private final String dmaapTopicName;
+ private final String dmaapProtocol;
private final String dmaapContentType;
/**
@@ -67,9 +67,9 @@ public class DmaapProducerReactiveHttpClient {
public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
- this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+ this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
}
@@ -115,18 +115,13 @@ public class DmaapProducerReactiveHttpClient {
private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
boolean result = true;
- try {
- post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+ post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
- JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
- metaData.getAsJsonObject().remove(LOCATION);
- post.header(X_ATT_DR_META, metaData.toString());
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+ post.header(X_ATT_DR_META, metaData.toString());
- post.uri(getUri());
- } catch (Exception e) {
- logger.error("Unable to post file to Data Router. " + model, e);
- result = false;
- }
+ post.uri(getUri(location));
return result;
}
@@ -138,9 +133,12 @@ public class DmaapProducerReactiveHttpClient {
post.body(BodyInserters.fromResource(httpResource));
}
- private URI getUri() throws URISyntaxException {
- return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
- .setPath(dmaapTopicName).build();
+ private URI getUri(String location) {
+ String fileName = location.substring(location.indexOf("/"), location.length());
+ String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+ URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
+ .port(dmaapPortNumber).path(path).build();
+ return uri;
}
private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 213e8d77..c0dbf31b 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -27,11 +27,9 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.http.client.utils.URIBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -42,22 +40,26 @@ import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DmaapProducerReactiveHttpClientTest {
- private static final String LOCATION = "location";
+ private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String LOCATION_JSON_TAG = "location";
private static final String X_ATT_DR_META = "X-ATT-DR-META";
private static final String HOST = "54.45.33.2";
- private static final String HTTPS_SCHEME = "https";
+ private static final String HTTP_SCHEME = "http";
private static final int PORT = 1234;
private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
- private static final String FILE_READY_TOPIC = "fileReady";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -71,12 +73,12 @@ class DmaapProducerReactiveHttpClientTest {
@BeforeEach
void setUp() {
when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+ when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTP_SCHEME);
when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("DATAFILE");
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("DATAFILE");
when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
- when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(FILE_READY_TOPIC);
+ when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
@@ -91,9 +93,6 @@ class DmaapProducerReactiveHttpClientTest {
@Test
void getHttpResponse_Success() {
- // given
-
- // when
mockWebClientDependantObject();
dmaapProducerReactiveHttpClient.createDmaapWebClient(webClientMock);
List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
@@ -101,18 +100,12 @@ class DmaapProducerReactiveHttpClientTest {
dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList));
- // then
verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
- metaData.getAsJsonObject().remove(LOCATION);
+ metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
- URI expectedUri = null;
- try {
- expectedUri = new URIBuilder().setScheme(HTTPS_SCHEME).setHost(HOST).setPort(1234).setPath(FILE_READY_TOPIC)
- .build();
- } catch (URISyntaxException e) {
- // Nothing
- }
+ URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT)
+ .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build();
verify(requestBodyUriSpecMock).uri(expectedUri);
verify(requestBodyUriSpecMock).body(any());
}