From 88adbc830c24f309c19fc5874654cc1cfaebc600 Mon Sep 17 00:00:00 2001 From: dfarrelly Date: Wed, 13 Mar 2019 12:02:20 +0000 Subject: Add metadata filtering Issue-ID: DCAEGEN2-1286 Change-Id: Icfee7f24cb97b429e8b0db2d67da2f21e413cea0 Signed-off-by: dfarrelly --- .../org/onap/dcaegen2/services/pmmapper/App.java | 7 +- .../pmmapper/filtering/MetadataFilter.java | 126 +++++++++++++++++++++ .../dcaegen2/services/pmmapper/model/Event.java | 5 +- .../services/pmmapper/model/EventMetadata.java | 3 + .../services/pmmapper/model/MeasFilterConfig.java | 6 +- .../pmmapper/filtering/MetadataFilterTest.java | 119 +++++++++++++++++++ src/test/resources/incorrect_metadata.json | 12 ++ .../resources/multiple_filter_mapper_config.json | 34 ++++++ src/test/resources/no_filter_mapper_config.json | 34 ++++++ src/test/resources/valid_mapper_config.json | 2 +- src/test/resources/valid_metadata.json | 3 +- 11 files changed, 346 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java create mode 100644 src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java create mode 100644 src/test/resources/incorrect_metadata.json create mode 100644 src/test/resources/multiple_filter_mapper_config.json create mode 100644 src/test/resources/no_filter_mapper_config.json (limited to 'src') diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 11767e6..09d8975 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -32,6 +32,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; @@ -56,6 +57,10 @@ public class App { public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { Flux flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); + + MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); + + MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); flux.onBackpressureDrop(App::handleBackPressure) @@ -64,12 +69,12 @@ public class App { .parallel() .runOn(Schedulers.newParallel(""), 1) .doOnNext(event -> MDC.setContextMap(event.getMdc())) + .filter(metadataFilter::filter) .filter(validator::validate) .map(mapper::map) .subscribe(event -> logger.unwrap().info("Event Processed")); DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); - MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); dataRouterSubscriber.start(mapperConfig); Undertow.builder() diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java new file mode 100644 index 0000000..20c8a64 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java @@ -0,0 +1,126 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.filtering; + +import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.exceptions.*; +import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter; +import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +public class MetadataFilter { + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MetadataFilter.class)); + MapperConfig config; + + public MetadataFilter(MapperConfig config) { + this.config = config; + } + + /** + * Filters events by their metadata against filter object in configuration + * @param event inbound event + */ + public boolean filter(@NonNull Event event) { + String decompressionStatus; + logger.unwrap().info("Filtering event metadata"); + EventMetadata metadata = event.getMetadata(); + + MeasFilterConfig measFilterConfig = config.getFilterConfig(); + + List filters = measFilterConfig.getFilters(); + + if(metadata.getDecompressionStatus() != null) { + decompressionStatus = metadata.getDecompressionStatus(); + logger.unwrap().debug("Decompression Status: {}", decompressionStatus); + } + + if(filters.isEmpty()) { + logger.unwrap().info("No filter specified in config: {}", filters); + return true; + } + + for(Filter filter : filters) { + if(compareObjects(filter, metadata)) { + logger.unwrap().info("Metadata matches filter: {}", filter); + + event.setFilter(filter); + + return true; + } else { + logger.unwrap().debug("Metadata does not match filter: {}", filter); + } + } + logger.unwrap().info("Metadata does not match any filters, sending process event indicator to DR"); + try { + DataRouterUtils.processEvent(config, event); + }catch (ProcessEventException exception) { + logger.unwrap().error("Process event failure", exception); + } + return false; + + } + + /** + * Compares event metadata against filter object + * @param filter filter object received from configuration + * @param metadata metadata from event + */ + private boolean compareObjects(Filter filter, EventMetadata metadata) { + List> validators = Arrays.asList( + new VendorValidator(), + new TypeValidator() + ); + + for(Validator validation : validators) { + if (! validation.validate(filter, metadata)) { + return false; + } + } + return true; + } + + interface Validator { + boolean validate(A filter, B metadata); + } + + class VendorValidator implements Validator { + @Override + public boolean validate(Filter filter, EventMetadata metadata) { + return filter.getVendor().equals(metadata.getVendorName()); + } + } + + class TypeValidator implements Validator { + @Override + public boolean validate(Filter filter, EventMetadata metadata) { + return filter.getNfType().equals(metadata.getProductName()); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index 0e82119..fa41142 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -24,6 +24,7 @@ import lombok.Data; import lombok.NonNull; import java.util.Map; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter; /** * Class used to pass around relevant inbound event data. @@ -41,5 +42,7 @@ public class Event { @NonNull private String publishIdentity; - MeasCollecFile measCollecFile; + private MeasCollecFile measCollecFile; + + private Filter filter; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java index 601b00f..8a0977d 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java @@ -19,6 +19,7 @@ */ package org.onap.dcaegen2.services.pmmapper.model; +import com.google.gson.annotations.SerializedName; import lombok.Data; import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired; @@ -47,4 +48,6 @@ public class EventMetadata { private String fileFormatType; @GSONRequired private String fileFormatVersion; + @SerializedName("decompression_status") + private String decompressionStatus; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java index 458a6cd..0f1aaa9 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java @@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired; @Data @EqualsAndHashCode @@ -37,17 +38,20 @@ public class MeasFilterConfig { @Data public class Filter { + @GSONRequired @SerializedName("pmDefVsn") private String dictionaryVersion; + @GSONRequired @SerializedName("nfType") private String nfType; + @GSONRequired @SerializedName("vendor") private String vendor; + @GSONRequired @SerializedName("measTypes") private List measTypes; - } } diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java new file mode 100644 index 0000000..47c187a --- /dev/null +++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilterTest.java @@ -0,0 +1,119 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.filtering; + +import com.google.gson.Gson; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.powermock.core.classloader.annotations.PrepareForTest; +import utils.EventUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +@PrepareForTest(MapperConfig.class) +public class MetadataFilterTest { + + private MetadataFilter metadataFilter; + + private static MapperConfig validConfig; + private static MapperConfig noFilterConfig; + private static MapperConfig multipleFilterConfig; + + private static String validConfigFileContents; + private static String noFilterConfigFileContents; + private static String multipleFilterConfigFileContents; + + private static final Path validMetadata = Paths.get("src/test/resources/valid_metadata.json"); + private static final Path incorrectMetadata = Paths.get("src/test/resources/incorrect_metadata.json"); + + private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json"); + private static final Path noFilterConfigPath = Paths.get("src/test/resources/no_filter_mapper_config.json"); + private static final Path multipleFilterConfigPath = Paths.get("src/test/resources/multiple_filter_mapper_config.json"); + + private static final Path dataDirectory = Paths.get("src/test/resources/xml_validator_test/test_data/"); + + + @BeforeEach + void setup() throws Exception { + validConfigFileContents = new String(Files.readAllBytes(validConfigPath)); + noFilterConfigFileContents = new String(Files.readAllBytes(noFilterConfigPath)); + multipleFilterConfigFileContents = new String(Files.readAllBytes(multipleFilterConfigPath)); + + validConfig = new Gson().fromJson(validConfigFileContents, MapperConfig.class); + noFilterConfig = new Gson().fromJson(noFilterConfigFileContents, MapperConfig.class); + multipleFilterConfig = new Gson().fromJson(multipleFilterConfigFileContents, MapperConfig.class); + + + metadataFilter = new MetadataFilter(this.validConfig); + } + + + @ParameterizedTest + @MethodSource("getEventsWithValidMetadata") + void testValidMetadataPass(Event testEvent) { + assertTrue(metadataFilter.filter(testEvent)); + } + + @ParameterizedTest + @MethodSource("getEventsWithValidMetadata") + void testEmptyFilterPass(Event testEvent) { + metadataFilter.config = noFilterConfig; + assertTrue(metadataFilter.filter(testEvent)); + } + + @ParameterizedTest + @MethodSource("getEventsWithValidMetadata") + void testMultipleFilterPass(Event testEvent) { + metadataFilter.config = multipleFilterConfig; + assertTrue(metadataFilter.filter(testEvent)); + } + + @ParameterizedTest + @MethodSource("getEventsWithInvalidMetadata") + void testInvalidMetadataFail(Event testEvent) { + assertFalse(metadataFilter.filter(testEvent)); + } + + + + private static List getEventsWithValidMetadata() throws IOException { + Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/"); + return EventUtils.eventsFromDirectory(validDataDirectory, validMetadata); + } + + private static List getEventsWithInvalidMetadata() throws IOException { + Path validDataDirectory = Paths.get(dataDirectory.toString() + "/valid/"); + return EventUtils.eventsFromDirectory(validDataDirectory, incorrectMetadata); + } +} \ No newline at end of file diff --git a/src/test/resources/incorrect_metadata.json b/src/test/resources/incorrect_metadata.json new file mode 100644 index 0000000..e61e94f --- /dev/null +++ b/src/test/resources/incorrect_metadata.json @@ -0,0 +1,12 @@ +{ + "productName": "LTE", + "vendorName": "Ericsson", + "lastEpochMicrosec": "1538478000000", + "sourceName": "oteNB5309", + "startEpochMicrosec": "1538478900000", + "timeZoneOffset": "UTC+05.00", + "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz", + "compression": "gzip", + "fileFormatType": "org.3GPP.32.435#measCollec", + "fileFormatVersion": "V8" +} \ No newline at end of file diff --git a/src/test/resources/multiple_filter_mapper_config.json b/src/test/resources/multiple_filter_mapper_config.json new file mode 100644 index 0000000..89bca57 --- /dev/null +++ b/src/test/resources/multiple_filter_mapper_config.json @@ -0,0 +1,34 @@ +{ + "pm-mapper-filter": {"filters":[{"pmDefVsn": "V8","nfType": "LTE","vendor": "Ericsson","measTypes": [ "A", "B" ]},{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]}, + "streams_subscribes": { + "dmaap_subscriber": { + "type": "data_router", + "aaf_username": null, + "aaf_password": null, + "dmaap_info": { + "location": "location", + "delivery_url": "delivery_url", + "username": "username", + "password": "password", + "subscriber_id": "subscriber_id" + } + } + }, + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "aaf_password": null, + "dmaap_info": { + "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic", + "client_role": null, + "location": null, + "client_id": null + }, + "aaf_username": null + } + }, + "dmaap_dr_feed_id": "2", + "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs", + "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete", + "services_calls": {} +} \ No newline at end of file diff --git a/src/test/resources/no_filter_mapper_config.json b/src/test/resources/no_filter_mapper_config.json new file mode 100644 index 0000000..3f855cf --- /dev/null +++ b/src/test/resources/no_filter_mapper_config.json @@ -0,0 +1,34 @@ +{ + "pm-mapper-filter": {"filters":[]}, + "streams_subscribes": { + "dmaap_subscriber": { + "type": "data_router", + "aaf_username": null, + "aaf_password": null, + "dmaap_info": { + "location": "location", + "delivery_url": "delivery_url", + "username": "username", + "password": "password", + "subscriber_id": "subscriber_id" + } + } + }, + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "aaf_password": null, + "dmaap_info": { + "topic_url": "https://message-router.onap.svc.cluster.local:3904/events/some-topic", + "client_role": null, + "location": null, + "client_id": null + }, + "aaf_username": null + } + }, + "dmaap_dr_feed_id": "2", + "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs", + "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete", + "services_calls": {} +} \ No newline at end of file diff --git a/src/test/resources/valid_mapper_config.json b/src/test/resources/valid_mapper_config.json index 6cd76bd..040406f 100644 --- a/src/test/resources/valid_mapper_config.json +++ b/src/test/resources/valid_mapper_config.json @@ -1,5 +1,5 @@ { - "pm-mapper-filter": {"filters":[]}, + "pm-mapper-filter": {"filters":[{"pmDefVsn": "V9","nfType": "NrRadio","vendor": "Ericsson","measTypes": [ "A", "B" ]}]}, "streams_subscribes": { "dmaap_subscriber": { "type": "data_router", diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json index 21de3fb..cf21437 100644 --- a/src/test/resources/valid_metadata.json +++ b/src/test/resources/valid_metadata.json @@ -8,5 +8,6 @@ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz", "compression": "gzip", "fileFormatType": "org.3GPP.32.435#measCollec", - "fileFormatVersion": "V9" + "fileFormatVersion": "V9", + "decompression_status": "false" } \ No newline at end of file -- cgit 1.2.3-korg