summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
authordfarrelly <david.farrelly@est.tech>2019-03-13 12:02:20 +0000
committerdfarrelly <david.farrelly@est.tech>2019-03-13 12:02:20 +0000
commit88adbc830c24f309c19fc5874654cc1cfaebc600 (patch)
tree65b268c6612c20cd64ac07559b811eb8ee7c1c64 /src/main/java/org
parent1f982f9e7f9f38743bbc1bcf2609292579762341 (diff)
Add metadata filtering
Issue-ID: DCAEGEN2-1286 Change-Id: Icfee7f24cb97b429e8b0db2d67da2f21e413cea0 Signed-off-by: dfarrelly <david.farrelly@est.tech>
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/App.java7
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java126
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java5
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java3
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java6
5 files changed, 144 insertions, 3 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 11767e6..09d8975 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -32,6 +32,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -56,6 +57,10 @@ public class App {
public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException {
Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
+
+ MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
+
+ MetadataFilter metadataFilter = new MetadataFilter(mapperConfig);
Mapper mapper = new Mapper(mappingTemplate);
XMLValidator validator = new XMLValidator(xmlSchema);
flux.onBackpressureDrop(App::handleBackPressure)
@@ -64,12 +69,12 @@ public class App {
.parallel()
.runOn(Schedulers.newParallel(""), 1)
.doOnNext(event -> MDC.setContextMap(event.getMdc()))
+ .filter(metadataFilter::filter)
.filter(validator::validate)
.map(mapper::map)
.subscribe(event -> logger.unwrap().info("Event Processed"));
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next);
- MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
dataRouterSubscriber.start(mapperConfig);
Undertow.builder()
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
new file mode 100644
index 0000000..20c8a64
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.filtering;
+
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.exceptions.*;
+import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
+import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class MetadataFilter {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MetadataFilter.class));
+ MapperConfig config;
+
+ public MetadataFilter(MapperConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Filters events by their metadata against filter object in configuration
+ * @param event inbound event
+ */
+ public boolean filter(@NonNull Event event) {
+ String decompressionStatus;
+ logger.unwrap().info("Filtering event metadata");
+ EventMetadata metadata = event.getMetadata();
+
+ MeasFilterConfig measFilterConfig = config.getFilterConfig();
+
+ List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
+
+ if(metadata.getDecompressionStatus() != null) {
+ decompressionStatus = metadata.getDecompressionStatus();
+ logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
+ }
+
+ if(filters.isEmpty()) {
+ logger.unwrap().info("No filter specified in config: {}", filters);
+ return true;
+ }
+
+ for(Filter filter : filters) {
+ if(compareObjects(filter, metadata)) {
+ logger.unwrap().info("Metadata matches filter: {}", filter);
+
+ event.setFilter(filter);
+
+ return true;
+ } else {
+ logger.unwrap().debug("Metadata does not match filter: {}", filter);
+ }
+ }
+ logger.unwrap().info("Metadata does not match any filters, sending process event indicator to DR");
+ try {
+ DataRouterUtils.processEvent(config, event);
+ }catch (ProcessEventException exception) {
+ logger.unwrap().error("Process event failure", exception);
+ }
+ return false;
+
+ }
+
+ /**
+ * Compares event metadata against filter object
+ * @param filter filter object received from configuration
+ * @param metadata metadata from event
+ */
+ private boolean compareObjects(Filter filter, EventMetadata metadata) {
+ List<Validator<Filter, EventMetadata>> validators = Arrays.asList(
+ new VendorValidator(),
+ new TypeValidator()
+ );
+
+ for(Validator<Filter, EventMetadata> validation : validators) {
+ if (! validation.validate(filter, metadata)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ interface Validator<A, B> {
+ boolean validate(A filter, B metadata);
+ }
+
+ class VendorValidator implements Validator<Filter, EventMetadata> {
+ @Override
+ public boolean validate(Filter filter, EventMetadata metadata) {
+ return filter.getVendor().equals(metadata.getVendorName());
+ }
+ }
+
+ class TypeValidator implements Validator<Filter, EventMetadata> {
+ @Override
+ public boolean validate(Filter filter, EventMetadata metadata) {
+ return filter.getNfType().equals(metadata.getProductName());
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
index 0e82119..fa41142 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -24,6 +24,7 @@ import lombok.Data;
import lombok.NonNull;
import java.util.Map;
+import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter;
/**
* Class used to pass around relevant inbound event data.
@@ -41,5 +42,7 @@ public class Event {
@NonNull
private String publishIdentity;
- MeasCollecFile measCollecFile;
+ private MeasCollecFile measCollecFile;
+
+ private Filter filter;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
index 601b00f..8a0977d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -19,6 +19,7 @@
*/
package org.onap.dcaegen2.services.pmmapper.model;
+import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@@ -47,4 +48,6 @@ public class EventMetadata {
private String fileFormatType;
@GSONRequired
private String fileFormatVersion;
+ @SerializedName("decompression_status")
+ private String decompressionStatus;
}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
index 458a6cd..0f1aaa9 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
@@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@Data
@EqualsAndHashCode
@@ -37,17 +38,20 @@ public class MeasFilterConfig {
@Data
public class Filter {
+ @GSONRequired
@SerializedName("pmDefVsn")
private String dictionaryVersion;
+ @GSONRequired
@SerializedName("nfType")
private String nfType;
+ @GSONRequired
@SerializedName("vendor")
private String vendor;
+ @GSONRequired
@SerializedName("measTypes")
private List<String> measTypes;
-
}
}