diff options
author | dfarrelly <david.farrelly@est.tech> | 2019-03-13 12:02:20 +0000 |
---|---|---|
committer | dfarrelly <david.farrelly@est.tech> | 2019-03-13 12:02:20 +0000 |
commit | 88adbc830c24f309c19fc5874654cc1cfaebc600 (patch) | |
tree | 65b268c6612c20cd64ac07559b811eb8ee7c1c64 /src/main/java/org | |
parent | 1f982f9e7f9f38743bbc1bcf2609292579762341 (diff) |
Add metadata filtering
Issue-ID: DCAEGEN2-1286
Change-Id: Icfee7f24cb97b429e8b0db2d67da2f21e413cea0
Signed-off-by: dfarrelly <david.farrelly@est.tech>
Diffstat (limited to 'src/main/java/org')
5 files changed, 144 insertions, 3 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java index 11767e6..09d8975 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java @@ -32,6 +32,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError; import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException; import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException; +import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter; import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; import org.onap.dcaegen2.services.pmmapper.model.Event; import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; @@ -56,6 +57,10 @@ public class App { public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException { Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink); HealthCheckHandler healthCheckHandler = new HealthCheckHandler(); + + MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); + + MetadataFilter metadataFilter = new MetadataFilter(mapperConfig); Mapper mapper = new Mapper(mappingTemplate); XMLValidator validator = new XMLValidator(xmlSchema); flux.onBackpressureDrop(App::handleBackPressure) @@ -64,12 +69,12 @@ public class App { .parallel() .runOn(Schedulers.newParallel(""), 1) .doOnNext(event -> MDC.setContextMap(event.getMdc())) + .filter(metadataFilter::filter) .filter(validator::validate) .map(mapper::map) .subscribe(event -> logger.unwrap().info("Event Processed")); DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next); - MapperConfig mapperConfig = new ConfigHandler().getMapperConfig(); dataRouterSubscriber.start(mapperConfig); Undertow.builder() diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java new file mode 100644 index 0000000..20c8a64 --- /dev/null +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java @@ -0,0 +1,126 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.pmmapper.filtering; + +import lombok.NonNull; +import org.onap.dcaegen2.services.pmmapper.exceptions.*; +import org.onap.dcaegen2.services.pmmapper.mapping.Mapper; +import org.onap.dcaegen2.services.pmmapper.model.Event; +import org.onap.dcaegen2.services.pmmapper.model.EventMetadata; +import org.onap.dcaegen2.services.pmmapper.model.MapperConfig; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter; +import org.onap.dcaegen2.services.pmmapper.utils.DataRouterUtils; +import org.onap.logging.ref.slf4j.ONAPLogAdapter; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +public class MetadataFilter { + private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(MetadataFilter.class)); + MapperConfig config; + + public MetadataFilter(MapperConfig config) { + this.config = config; + } + + /** + * Filters events by their metadata against filter object in configuration + * @param event inbound event + */ + public boolean filter(@NonNull Event event) { + String decompressionStatus; + logger.unwrap().info("Filtering event metadata"); + EventMetadata metadata = event.getMetadata(); + + MeasFilterConfig measFilterConfig = config.getFilterConfig(); + + List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters(); + + if(metadata.getDecompressionStatus() != null) { + decompressionStatus = metadata.getDecompressionStatus(); + logger.unwrap().debug("Decompression Status: {}", decompressionStatus); + } + + if(filters.isEmpty()) { + logger.unwrap().info("No filter specified in config: {}", filters); + return true; + } + + for(Filter filter : filters) { + if(compareObjects(filter, metadata)) { + logger.unwrap().info("Metadata matches filter: {}", filter); + + event.setFilter(filter); + + return true; + } else { + logger.unwrap().debug("Metadata does not match filter: {}", filter); + } + } + logger.unwrap().info("Metadata does not match any filters, sending process event indicator to DR"); + try { + DataRouterUtils.processEvent(config, event); + }catch (ProcessEventException exception) { + logger.unwrap().error("Process event failure", exception); + } + return false; + + } + + /** + * Compares event metadata against filter object + * @param filter filter object received from configuration + * @param metadata metadata from event + */ + private boolean compareObjects(Filter filter, EventMetadata metadata) { + List<Validator<Filter, EventMetadata>> validators = Arrays.asList( + new VendorValidator(), + new TypeValidator() + ); + + for(Validator<Filter, EventMetadata> validation : validators) { + if (! validation.validate(filter, metadata)) { + return false; + } + } + return true; + } + + interface Validator<A, B> { + boolean validate(A filter, B metadata); + } + + class VendorValidator implements Validator<Filter, EventMetadata> { + @Override + public boolean validate(Filter filter, EventMetadata metadata) { + return filter.getVendor().equals(metadata.getVendorName()); + } + } + + class TypeValidator implements Validator<Filter, EventMetadata> { + @Override + public boolean validate(Filter filter, EventMetadata metadata) { + return filter.getNfType().equals(metadata.getProductName()); + } + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java index 0e82119..fa41142 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java @@ -24,6 +24,7 @@ import lombok.Data; import lombok.NonNull; import java.util.Map; +import org.onap.dcaegen2.services.pmmapper.model.MeasFilterConfig.Filter; /** * Class used to pass around relevant inbound event data. @@ -41,5 +42,7 @@ public class Event { @NonNull private String publishIdentity; - MeasCollecFile measCollecFile; + private MeasCollecFile measCollecFile; + + private Filter filter; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java index 601b00f..8a0977d 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java @@ -19,6 +19,7 @@ */ package org.onap.dcaegen2.services.pmmapper.model; +import com.google.gson.annotations.SerializedName; import lombok.Data; import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired; @@ -47,4 +48,6 @@ public class EventMetadata { private String fileFormatType; @GSONRequired private String fileFormatVersion; + @SerializedName("decompression_status") + private String decompressionStatus; } diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java index 458a6cd..0f1aaa9 100644 --- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java +++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java @@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName; import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
@Data
@EqualsAndHashCode
@@ -37,17 +38,20 @@ public class MeasFilterConfig { @Data
public class Filter {
+ @GSONRequired
@SerializedName("pmDefVsn")
private String dictionaryVersion;
+ @GSONRequired
@SerializedName("nfType")
private String nfType;
+ @GSONRequired
@SerializedName("vendor")
private String vendor;
+ @GSONRequired
@SerializedName("measTypes")
private List<String> measTypes;
-
}
}
|