summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordfarrelly <david.farrelly@est.tech>2019-03-13 12:02:20 +0000
committerdfarrelly <david.farrelly@est.tech>2019-03-13 12:02:20 +0000
commit88adbc830c24f309c19fc5874654cc1cfaebc600 (patch)
tree65b268c6612c20cd64ac07559b811eb8ee7c1c64
parent1f982f9e7f9f38743bbc1bcf2609292579762341 (diff)
Add metadata filtering
Issue-ID: DCAEGEN2-1286 Change-Id: Icfee7f24cb97b429e8b0db2d67da2f21e413cea0 Signed-off-by: dfarrelly <david.farrelly@est.tech>
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java7
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java126
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java5
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java3
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java6
-rw-r--r--src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java119
-rw-r--r--src/test/resources/incorrect_metadata.json12
-rw-r--r--src/test/resources/multiple_filter_mapper_config.json34
-rw-r--r--src/test/resources/no_filter_mapper_config.json34
-rw-r--r--src/test/resources/valid_mapper_config.json2
-rw-r--r--src/test/resources/valid_metadata.json3
11 files changed, 346 insertions, 5 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 11767e6..09d8975 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -32,6 +32,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -56,6 +57,10 @@ public class App {
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
+
+ MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
+
+ MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
flux.onBackpressureDrop(App::handleBackPressure)
@@ -64,12 +69,12 @@ public class App {
.parallel()
.runOn(Schedulers.newParallel(""), 1)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
+ .filter(metadataFilter::filter)
.filter(validator::validate)
.map(mapper::map)
.subscribe(event -> logger.unwrap().info("Event Processed"));
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
- MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
dataRouterSubscriber.start(mapperConfig);
Undertow.builder()
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
new file mode 100644
index 0000000..20c8a64
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.filtering;
+
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.exceptions.*;
+import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
+import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MetadataFilter {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MetadataFilter.class));
+ MapperConfig config;
+
+ public MetadataFilter(MapperConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Filters events by their metadata against filter object in configuration
+ * @param event inbound event
+ */
+ public boolean filter(@NonNull Event event) {
+ String decompressionStatus;
+ logger.unwrap().info("Filtering event metadata");
+ EventMetadata metadata = event.getMetadata();
+
+ MeasFilterConfig measFilterConfig = config.getFilterConfig();
+
+ List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
+
+ if(metadata.getDecompressionStatus() != null) {
+ decompressionStatus = metadata.getDecompressionStatus();
+ logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
+ }
+
+ if(filters.isEmpty()) {
+ logger.unwrap().info("No filter specified in config: {}", filters);
+ return true;
+ }
+
+ for(Filter filter : filters) {
+ if(compareObjects(filter, metadata)) {
+ logger.unwrap().info("Metadata matches filter: {}", filter);
+
+ event.setFilter(filter);
+
+ return true;
+ } else {
+ logger.unwrap().debug("Metadata does not match filter: {}", filter);
+ }
+ }
+ logger.unwrap().info("Metadata does not match any filters, sending process event indicator to DR");
+ try {
+ DataRouterUtils.processEvent(config, event);
+ }catch (ProcessEventException exception) {
+ logger.unwrap().error("Process event failure", exception);
+ }
+ return false;
+
+ }
+
+ /**
+ * Compares event metadata against filter object
+ * @param filter filter object received from configuration
+ * @param metadata metadata from event
+ */
+ private boolean compareObjects(Filter filter, EventMetadata metadata) {
+ List<Validator<Filter, EventMetadata>> validators = Arrays.asList(
+ new VendorValidator(),
+ new TypeValidator()
+ );
+
+ for(Validator<Filter, EventMetadata> validation : validators) {
+ if (! validation.validate(filter, metadata)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ interface Validator<A, B> {
+ boolean validate(A filter, B metadata);
+ }
+
+ class VendorValidator implements Validator<Filter, EventMetadata> {
+ @Override
+ public boolean validate(Filter filter, EventMetadata metadata) {
+ return filter.getVendor().equals(metadata.getVendorName());
+ }
+ }
+
+ class TypeValidator implements Validator<Filter, EventMetadata> {
+ @Override
+ public boolean validate(Filter filter, EventMetadata metadata) {
+ return filter.getNfType().equals(metadata.getProductName());
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index 0e82119..fa41142 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -24,6 +24,7 @@ import lombok.Data;
import lombok.NonNull;
import java.util.Map;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
/**
* Class used to pass around relevant inbound event data.
@@ -41,5 +42,7 @@ public class Event {
@NonNull
private String publishIdentity;
- MeasCollecFile measCollecFile;
+ private MeasCollecFile measCollecFile;
+
+ private Filter filter;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
index 601b00f..8a0977d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -19,6 +19,7 @@
*/
package org.onap.dcaegen2.services.pmmapper.model;
+import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@@ -47,4 +48,6 @@ public class EventMetadata {
private String fileFormatType;
@GSONRequired
private String fileFormatVersion;
+ @SerializedName("decompression_status")
+ private String decompressionStatus;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
index 458a6cd..0f1aaa9 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
@@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@Data
@EqualsAndHashCode
@@ -37,17 +38,20 @@ public class MeasFilterConfig {
@Data
public class Filter {
+ @GSONRequired
@SerializedName("pmDefVsn")
private String dictionaryVersion;
+ @GSONRequired
@SerializedName("nfType")
private String nfType;
+ @GSONRequired
@SerializedName("vendor")
private String vendor;
+ @GSONRequired
@SerializedName("measTypes")
private List<String> measTypes;
-
}
}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java
new file mode 100644
index 0000000..47c187a
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.filtering;
+
+import com.google.gson.Gson;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import utils.EventUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@PrepareForTest(MapperConfig.class)
+public class MetadataFilterTest {
+
+ private MetadataFilter metadataFilter;
+
+ private static MapperConfig validConfig;
+ private static MapperConfig noFilterConfig;
+ private static MapperConfig multipleFilterConfig;
+
+ private static String validConfigFileContents;
+ private static String noFilterConfigFileContents;
+ private static String multipleFilterConfigFileContents;
+
+ private static final Path validMetadata = Paths.get("src/test/resources/valid_metadata.json");
+ private static final Path incorrectMetadata = Paths.get("src/test/resources/incorrect_metadata.json");
+
+ private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json");
+ private static final Path noFilterConfigPath = Paths.get("src/test/resources/no_filter_mapper_config.json");
+ private static final Path multipleFilterConfigPath = Paths.get("src/test/resources/multiple_filter_mapper_config.json");
+
+ private static final Path dataDirectory = Paths.get("src/test/resources/xml_validator_test/test_data/");
+
+
+ @BeforeEach
+ void setup() throws Exception {
+ validConfigFileContents = new String(Files.readAllBytes(validConfigPath));
+ noFilterConfigFileContents = new String(Files.readAllBytes(noFilterConfigPath));
+ multipleFilterConfigFileContents = new String(Files.readAllBytes(multipleFilterConfigPath));
+
+ validConfig = new Gson().fromJson(validConfigFileContents, MapperConfig.class);
+ noFilterConfig = new Gson().fromJson(noFilterConfigFileContents, MapperConfig.class);
+ multipleFilterConfig = new Gson().fromJson(multipleFilterConfigFileContents, MapperConfig.class);
+
+
+ metadataFilter = new MetadataFilter(this.validConfig);
+ }
+
+
+ @ParameterizedTest
+ @MethodSource("getEventsWithValidMetadata")
+ void testValidMetadataPass(Event testEvent) {
+ assertTrue(metadataFilter.filter(testEvent));
+ }
+
+ @ParameterizedTest
+ @MethodSource("getEventsWithValidMetadata")
+ void testEmptyFilterPass(Event testEvent) {
+ metadataFilter.config = noFilterConfig;
+ assertTrue(metadataFilter.filter(testEvent));
+ }
+
+ @ParameterizedTest
+ @MethodSource("getEventsWithValidMetadata")
+ void testMultipleFilterPass(Event testEvent) {
+ metadataFilter.config = multipleFilterConfig;
+ assertTrue(metadataFilter.filter(testEvent));
+ }
+
+ @ParameterizedTest
+ @MethodSource("getEventsWithInvalidMetadata")
+ void testInvalidMetadataFail(Event testEvent) {
+ assertFalse(metadataFilter.filter(testEvent));
+ }
+
+
+
+ private static List<Event> getEventsWithValidMetadata() throws IOException {
+ Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/");
+ return EventUtils.eventsFromDirectory(validDataDirectory, validMetadata);
+ }
+
+ private static List<Event> getEventsWithInvalidMetadata() throws IOException {
+ Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/");
+ return EventUtils.eventsFromDirectory(validDataDirectory, incorrectMetadata);
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/incorrect_metadata.json b/src/test/resources/incorrect_metadata.json
new file mode 100644
index 0000000..e61e94f
--- /dev/null
+++ b/src/test/resources/incorrect_metadata.json
@@ -0,0 +1,12 @@
+{
+ "productName": "LTE",
+ "vendorName": "Ericsson",
+ "lastEpochMicrosec": "1538478000000",
+ "sourceName": "oteNB5309",
+ "startEpochMicrosec": "1538478900000",
+ "timeZoneOffset": "UTC+05.00",
+ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+ "compression": "gzip",
+ "fileFormatType": "org.3GPP.32.435#measCollec",
+ "fileFormatVersion": "V8"
+} \ No newline at end of file
diff --git a/src/test/resources/multiple_filter_mapper_config.json b/src/test/resources/multiple_filter_mapper_config.json
new file mode 100644
index 0000000..89bca57
--- /dev/null
+++ b/src/test/resources/multiple_filter_mapper_config.json
@@ -0,0 +1,34 @@
+{
+ "pm-mapper-filter": {"filters":[{"pmDefVsn": "V8","nfType": "LTE","vendor": "Ericsson","measTypes": [ "A", "B" ]},{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]},
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "type": "data_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "location",
+ "delivery_url": "delivery_url",
+ "username": "username",
+ "password": "password",
+ "subscriber_id": "subscriber_id"
+ }
+ }
+ },
+ "streams_publishes": {
+ "dmaap_publisher": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic",
+ "client_role": null,
+ "location": null,
+ "client_id": null
+ },
+ "aaf_username": null
+ }
+ },
+ "dmaap_dr_feed_id": "2",
+ "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
+ "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
+ "services_calls": {}
+} \ No newline at end of file
diff --git a/src/test/resources/no_filter_mapper_config.json b/src/test/resources/no_filter_mapper_config.json
new file mode 100644
index 0000000..3f855cf
--- /dev/null
+++ b/src/test/resources/no_filter_mapper_config.json
@@ -0,0 +1,34 @@
+{
+ "pm-mapper-filter": {"filters":[]},
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "type": "data_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "location",
+ "delivery_url": "delivery_url",
+ "username": "username",
+ "password": "password",
+ "subscriber_id": "subscriber_id"
+ }
+ }
+ },
+ "streams_publishes": {
+ "dmaap_publisher": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic",
+ "client_role": null,
+ "location": null,
+ "client_id": null
+ },
+ "aaf_username": null
+ }
+ },
+ "dmaap_dr_feed_id": "2",
+ "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
+ "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
+ "services_calls": {}
+} \ No newline at end of file
diff --git a/src/test/resources/valid_mapper_config.json b/src/test/resources/valid_mapper_config.json
index 6cd76bd..040406f 100644
--- a/src/test/resources/valid_mapper_config.json
+++ b/src/test/resources/valid_mapper_config.json
@@ -1,5 +1,5 @@
{
- "pm-mapper-filter": {"filters":[]},
+ "pm-mapper-filter": {"filters":[{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]},
"streams_subscribes": {
"dmaap_subscriber": {
"type": "data_router",
diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json
index 21de3fb..cf21437 100644
--- a/src/test/resources/valid_metadata.json
+++ b/src/test/resources/valid_metadata.json
@@ -8,5 +8,6 @@
"location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
"compression": "gzip",
"fileFormatType": "org.3GPP.32.435#measCollec",
- "fileFormatVersion": "V9"
+ "fileFormatVersion": "V9",
+ "decompression_status": "false"
} \ No newline at end of file