diff options
author | Maciej Malewski <maciej.malewski@nokia.com> | 2021-06-08 09:04:48 +0200 |
---|---|---|
committer | Maciej Malewski <maciej.malewski@nokia.com> | 2021-06-16 14:17:34 +0200 |
commit | 430cf11bbcb447316f831e1dd1b8760abeaa9171 (patch) | |
tree | 016bf553772496117d2a2c2de3b858e3b7f67854 /src | |
parent | 26be283f4a7044aea4ee0ca480fde20eb5233ee2 (diff) |
Replace cambria with DmaaP client
- remove cambria, add DmaaP client
- sending event for many topics at once is no longer supported
- add backward compatibility status codes
- add additional validation for batchEvent
Issue-ID: DCAEGEN2-1483
Signed-off-by: Maciej Malewski <maciej.malewski@nokia.com>
Change-Id: I502e1b21af217a07f8432b14dd833dfb3c139975
Diffstat (limited to 'src')
47 files changed, 2128 insertions, 962 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 7bdef655..7b8c3ff6 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -1,9 +1,9 @@ /* * ============LICENSE_START======================================================= - * PROJECT + * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 - 2020 Nokia. All rights reserved.s + * Copyright (C) 2018 - 2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ package org.onap.dcae; -import static java.lang.String.format; - import com.google.common.annotations.VisibleForTesting; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -30,26 +28,32 @@ import com.networknt.schema.JsonSchema; import io.vavr.Function1; import io.vavr.collection.HashMap; import io.vavr.collection.Map; -import java.io.FileReader; -import java.io.IOException; -import java.lang.reflect.Type; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import javax.annotation.Nullable; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.onap.dcae.common.EventTransformation; import org.onap.dcae.common.configuration.AuthMethodType; +import org.onap.dcae.multiplestreamreducer.MultipleStreamReducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static java.lang.String.format; + /** * Abstraction over application configuration. * Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties. */ public class ApplicationSettings { + public static String responseCompatibility; + private static final String EVENT_TRANSFORM_FILE_PATH = "./etc/eventTransform.json"; private static final String COULD_NOT_FIND_FILE = "Couldn't find file " + EVENT_TRANSFORM_FILE_PATH; @@ -62,6 +66,7 @@ public class ApplicationSettings { private final PropertiesConfiguration properties = new PropertiesConfiguration(); private final Map<String, JsonSchema> loadedJsonSchemas; private final List<EventTransformation> eventTransformations; + private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer(); public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) { this(args, argsParser, System.getProperty("user.dir")); @@ -78,6 +83,7 @@ public class ApplicationSettings { format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION)); loadedJsonSchemas = new JSonSchemasSupplier().loadJsonSchemas(collectorSchemaFile); eventTransformations = loadEventTransformations(); + responseCompatibility = getResponseCompatibilityFlag(); } /** @@ -155,7 +161,7 @@ public class ApplicationSettings { } public String dMaaPConfigurationFileLocation() { - return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json")); + return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "dpo/data-formats/ves-dmaap-config.json")); } public String certSubjectMatcher(){ @@ -166,13 +172,9 @@ public class ApplicationSettings { return properties.getString("auth.method", AuthMethodType.NO_AUTH.value()); } - public Map<String, String[]> getDmaapStreamIds() { + public Map<String, String> getDmaapStreamIds() { String streamIdsProperty = properties.getString("collector.dmaap.streamid", null); - if (streamIdsProperty == null) { - return HashMap.empty(); - } else { - return convertDMaaPStreamsPropertyToMap(streamIdsProperty); - } + return streamIdsProperty == null ? HashMap.empty() : reduceStream(streamIdsProperty); } public boolean getExternalSchemaValidationCheckflag() { @@ -203,6 +205,10 @@ public class ApplicationSettings { return properties.getString("collector.description.api.version.location", "etc/api_version_description.json"); } + private String getResponseCompatibilityFlag() { + return properties.getString("collector.response.compatibility", "v7.2"); + } + private void loadPropertiesFromFile() { try { properties.load(configurationFileLocation); @@ -261,6 +267,13 @@ public class ApplicationSettings { } } + private Map<String, String> reduceStream(String streamIdsProperty) { + Map<String, String[]> dMaaPStreamsProperty = convertDMaaPStreamsPropertyToMap(streamIdsProperty); + final Map<String, String> domainToStreamConfig = multipleStreamReducer.reduce(dMaaPStreamsProperty); + log.warn(multipleStreamReducer.getDomainToStreamsInfo(domainToStreamConfig)); + return domainToStreamConfig; + } + @VisibleForTesting String getStringDirectly(String key) { return properties.getString(key); diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 81c463dc..400597ff 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,52 +20,38 @@ */ package org.onap.dcae.common; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; + import io.vavr.collection.Map; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.restapi.EventValidatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import java.util.List; -public class EventSender { +import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID; - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - private Map<String, String[]> streamIdToDmaapIds; - private DMaaPEventPublisher eventPublisher; - private static final Logger log = LoggerFactory.getLogger(EventSender.class); - public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) { - this.eventPublisher = eventPublisher; - this.streamIdToDmaapIds = streamIdToDmaapIds; - } +public class EventSender { - public void send(List<VesEvent> vesEvents) { - for (VesEvent vesEvent : vesEvents) { - metriclog.info("EVENT_PUBLISH_START"); - setLoggingContext(vesEvent); - streamIdToDmaapIds.get(vesEvent.getStreamId()) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); - log.debug("Message published" + vesEvent.asJsonObject()); - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metriclog.info("EVENT_PUBLISH_END"); - } + private Map<String, String> streamIdToDmaapIds; + private DMaaPEventPublisher eventPublisher; + private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { - for (String dmaapId : dmaapIds) { - log.info("Invoking publisher for streamId/domain:" + dmaapId); - eventPublisher.sendEvent(vesEvent, dmaapId); + public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String> streamIdToDmaapIds) { + this.eventPublisher = eventPublisher; + this.streamIdToDmaapIds = streamIdToDmaapIds; } - } - private void setLoggingContext(VesEvent vesEvent) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain()); - } + public HttpStatus send(List<VesEvent> vesEvents) { + String topic = streamIdToDmaapIds + .get(vesEvents.get(0).getStreamId()) + .getOrElse(() -> { + log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject()); + throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID); + }); + return eventPublisher.sendEvent(vesEvents, topic); + } } diff --git a/src/main/java/org/onap/dcae/common/VESLogger.java b/src/main/java/org/onap/dcae/common/VESLogger.java deleted file mode 100644 index 1072fb54..00000000 --- a/src/main/java/org/onap/dcae/common/VESLogger.java +++ /dev/null @@ -1,106 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common; - -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.LoggingContextFactory; -import com.att.nsa.logging.log4j.EcompFields; -import java.util.UUID; - -public class VESLogger { - - - public static final String REQUEST_ID = "requestId"; - - // Common LoggingContext - private static LoggingContext commonLC; - // Thread-specific LoggingContext - private static LoggingContext threadLC; - - /** - * Returns the common LoggingContext instance that is the base context for - * all subsequent instances. - * - * @return the common LoggingContext - */ - public static LoggingContext getCommonLoggingContext() { - if (commonLC == null) { - commonLC = new LoggingContextFactory.Builder().build(); - final UUID uuid = UUID.randomUUID(); - - commonLC.put(REQUEST_ID, uuid.toString()); - } - return commonLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(UUID aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid.toString()); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - - return threadLC; - } - - /** - * Get a logging context for the current thread that's based on the common - * logging context. Populate the context with context-specific values. - * - * @param aUuid uuid for request id - * @return a LoggingContext for the current thread - */ - public static LoggingContext getLoggingContextForThread(String aUuid) { - // note that this operation requires everything from the common context - // to be (re)copied into the target context. That seems slow, but it - // actually - // helps prevent the thread from overwriting supposedly common data. It - // also - // should be fairly quick compared with the overhead of handling the - // actual - // service call. - - threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build(); - // Establish the request-specific UUID, as long as we are here... - threadLC.put(REQUEST_ID, aUuid); - threadLC.put("statusCode", "COMPLETE"); - threadLC.put(EcompFields.kEndTimestamp, SaClock.now()); - return threadLC; - } - -} diff --git a/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java new file mode 100644 index 00000000..aab3c44b --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java @@ -0,0 +1,23 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +public class BackwardsCompatibilityException extends RuntimeException { +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/model/InternalException.java b/src/main/java/org/onap/dcae/common/model/InternalException.java new file mode 100644 index 00000000..da93e5db --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/InternalException.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +import org.onap.dcae.restapi.ApiException; + +public class InternalException extends RuntimeException { + + private final ApiException apiException; + + public InternalException(ApiException apiException) { + this.apiException = apiException; + } + + public ApiException getApiException() { + return apiException; + } +} diff --git a/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java new file mode 100644 index 00000000..e82ad775 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java @@ -0,0 +1,23 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.model; + +public class PayloadToLargeException extends RuntimeException { +} diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java index 8e2db80e..88419555 100644 --- a/src/main/java/org/onap/dcae/common/model/VesEvent.java +++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * VES Collector * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved.s + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ package org.onap.dcae.common.model; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONException; import org.json.JSONObject; +import java.util.Optional; /** * This class is a wrapper for JSONObject, that represents VES event. @@ -112,6 +114,16 @@ public class VesEvent { } /** + * Returns optional stndDefinedNamespace name from VES event. + * + * @return Optional stndDefinedNamespace + */ + public Optional<String> getStndDefinedNamespace() throws JSONException { + return isStdDefinedDomain(getDomain()) ? Optional.ofNullable(getEventHeader()) + .map(header -> header.getString(STND_DEFINED_NAMESPACE)) : Optional.empty(); + } + + /** * Checks if type of event is same as given in paramaters. * * @param type name that will be compared with event type diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java index 274e4490..9f8ffcc6 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java @@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser { } private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) { - return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + return Try(() -> parseNewFormat(config)) .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); } - private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { - return dMaaPConfig.has("channels"); - } - - private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) { - return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List<String> destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); - } private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) { return root.keys().toMap( diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 2b4cfc15..08e16e0c 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,36 +21,29 @@ package org.onap.dcae.common.publishing; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import io.vavr.control.Try; -import org.onap.dcae.common.VESLogger; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import reactor.core.publisher.Flux; -import java.io.IOException; - -import static org.onap.dcae.common.publishing.VavrUtils.f; +import java.util.List; +import java.util.Objects; +import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public class DMaaPEventPublisher { - private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); - private DMaaPPublishersCache publishersCache; - private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); - - DMaaPEventPublisher(DMaaPPublishersCache publishersCache) { - this.publishersCache = publishersCache; - } + private Map<String, PublisherConfig> dMaaPConfig; + private final Publisher dmaapPublisher; public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) { - this(new DMaaPPublishersCache(dMaaPConfig)); + this.dMaaPConfig = dMaaPConfig; + dmaapPublisher = new Publisher(); } /** @@ -58,48 +51,29 @@ public class DMaaPEventPublisher { * @param dmaapConfiguration Dmaap configuration */ public void reload(Map<String, PublisherConfig> dmaapConfiguration){ - this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration); - } - - public void sendEvent(VesEvent vesEvent, String dmaapId){ - clearVesUniqueIdFromEvent(vesEvent); - publishersCache.getPublisher(dmaapId) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) - .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); - } - - private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) - .onFailure(exc -> closePublisher(event, dmaapId, exc)); + dMaaPConfig = dmaapConfiguration; + log.info("reload dmaap configuration"); } - private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) - throws IOException { - - String pk = event.getPK(); - int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); - if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { - log.info("Pending messages count: " + pendingMsgs); - } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); - log.info(infoMsg); - outputLogger.info(infoMsg); + public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) { + clearVesUniqueIdFromEvent(vesEvents); + io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents); + Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); + MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst(); + return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse)); } - private void closePublisher(VesEvent event, String dmaapId, Throwable e) { - log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, dmaapId), e); - publishersCache.closePublisherFor(dmaapId); + private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) { + return io.vavr.collection.List.ofAll(vesEvents) + .map(event -> event.asJsonObject().toString()); } - private void clearVesUniqueIdFromEvent(VesEvent event) { - if (event.hasType(VesEvent.VES_UNIQUE_ID)) { - String uuid = event.getUniqueId().toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.removeElement(VesEvent.VES_UNIQUE_ID); - } + private void clearVesUniqueIdFromEvent(List<VesEvent> events) { + events.stream() + .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)) + .forEach(event -> { + log.debug("Removing VESuniqueid object from event"); + event.removeElement(VesEvent.VES_UNIQUE_ID); + }); } } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java deleted file mode 100644 index a93073bf..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java +++ /dev/null @@ -1,61 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import io.vavr.control.Try; - -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -final class DMaaPPublishersBuilder { - - static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) { - return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); - } - - private static PublisherBuilder builder(PublisherConfig config) { - if (config.isSecured()) { - return authenticatedBuilder(config); - } else { - return unAuthenticatedBuilder(config); - } - } - - private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { - return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); - } - - private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { - return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); - } -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java deleted file mode 100644 index b7997ef9..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache<String, CambriaBatchingPublisher> publishersCache; - private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration; - - DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option<CambriaBatchingPublisher> getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { - Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); - Map<String, PublisherConfig> removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map<String, PublisherConfig> changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> { - - @Override - public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List<?> stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100644 index 00000000..2eaeab6a --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + static Flux<JsonObject> jsonBatch(List<String> messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option<PublisherConfig> publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List<JsonObject> getAsJsonObjects(List<String> messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List<JsonElement> getAsJsonElements(List<String> messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100644 index 00000000..b5c735b7 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new BackwardsCompatibilityException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new PayloadToLargeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse))); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100644 index 00000000..1d688d86 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +public class Publisher { + + private final MessageRouterPublisher publisher; + + public Publisher() { + this(retryConfiguration()); + } + + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) { + final Flux<JsonObject> jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 1fd0d316..0bb51922 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ public final class PublisherConfig { this.topic = topic; } + PublisherConfig(List<String> destinations, String topic, String userName, String password) { this.destinations = destinations; this.topic = topic; @@ -50,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } diff --git a/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java new file mode 100644 index 00000000..ea920740 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.validator; + +import io.vavr.control.Try; +import org.json.JSONException; +import org.onap.dcae.common.model.VesEvent; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcae.restapi.EventValidatorException; + +import java.util.List; + +import static java.util.stream.Collectors.toSet; +import static org.onap.dcae.restapi.ApiException.DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT; +import static org.onap.dcae.restapi.ApiException.DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED; + +public class BatchEventValidator { + + private BatchEventValidator() { + } + + /** + * Check if value of domain fields are the same in every event, + * in case of stndDefined check stndDefinedNamespace fields + * + * @param events list of checked ves events + * @throws EventValidatorException when domain fields value or stndDefinedNamespace fields value are note the same + */ + public static void executeBatchEventValidation(List<VesEvent> events) throws EventValidatorException { + if (hasNotEveryEventSameDomain(events)) { + throw new EventValidatorException(DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT); + } + if (isDomainStndDefined(events) && hasNotSameStndDefinedNamespace(events)) { + throw new EventValidatorException(DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED); + } + } + + private static boolean hasNotEveryEventSameDomain(List<VesEvent> events) { + return events.stream() + .map(VesEvent::getDomain) + .collect(toSet()) + .size() != 1; + } + + private static boolean hasNotSameStndDefinedNamespace(List<VesEvent> events) { + return Try.of(() -> isAllStndDefinedNamespace(events)) + .getOrElseThrow(() -> new EventValidatorException(ApiException.MISSING_NAMESPACE_PARAMETER)); + } + + private static boolean isAllStndDefinedNamespace(List<VesEvent> events) { + return events.stream() + .map(e -> e.getStndDefinedNamespace().orElse("")) + .collect(toSet()) + .size() != 1; + } + + private static boolean isDomainStndDefined(List<VesEvent> events) throws JSONException{ + return events.stream() + .allMatch((e -> e.getDomain().equals("stndDefined"))); + } +} diff --git a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java index 5cd5dc25..975db791 100644 --- a/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java +++ b/src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java @@ -25,7 +25,6 @@ import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.restapi.ApiException; import org.onap.dcae.restapi.EventValidatorException; - /** * This class is using ApplicationSetting and SchemaValidator to validate VES event. * diff --git a/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java new file mode 100644 index 00000000..c03ab6bb --- /dev/null +++ b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.multiplestreamreducer; + +import io.vavr.Tuple2; +import io.vavr.collection.Map; + +public class MultipleStreamReducer { + + /** + * Converts configuration from: "one domain many streams" + * to: "one domain one stream" + * + * @param map domain to streams configuration + * @return configuration - one domain one stream + */ + public Map<String, String> reduce(Map<String, String[]> map) { + return map.toStream() + .toMap(Tuple2::_1, v -> v._2[0]); + } + + /** + * Information about the current match: domain to stream + * + * @param domainToStreamConfig domain to stream configuration + * @return current domain to stream information + */ + public String getDomainToStreamsInfo(Map<String, String> domainToStreamConfig) { + return domainToStreamConfig.map(v -> "Domain: " + + v._1 + " has active stream: " + v._2 + System.lineSeparator()) + .reduce((a, b) -> a + b); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java index dbd41a4d..e819cbd6 100644 --- a/src/main/java/org/onap/dcae/restapi/ApiException.java +++ b/src/main/java/org/onap/dcae/restapi/ApiException.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,8 +40,18 @@ public enum ApiException { NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503), STND_DEFINED_VALIDATION_FAILED(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.data invalid against event.stndDefinedFields.schemaReference", "400"), 400), NO_LOCAL_SCHEMA_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2004", "Invalid input value for %1 %2: %3", List.of("attribute", "event.stndDefinedFields.schemaReference", "Referred external schema not present in schema repository"), 400), - INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400); - + INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400), + DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT(ExceptionType.SERVICE_EXCEPTION, "SVC0001", "Different value of domain fields in Batch Event", 400), + DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED(ExceptionType.SERVICE_EXCEPTION, "SVC0001","Value of stndDefinedNamespace fields have to be same when domain is stndDefined",400), + DOMAIN_NOT_DEFINED_FOR_STREAM_ID(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("No domain defined for stream id", "400"), 400), + PAYLOAD_TO_LARGE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Entity Too Large", "413"), 413), + NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404), + REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408), + TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429), + INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500), + BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502), + SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503), + GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504); public final int httpStatusCode; private final ExceptionType type; diff --git a/src/main/java/org/onap/dcae/restapi/VesRestController.java b/src/main/java/org/onap/dcae/restapi/VesRestController.java index 93e428b4..6c4fb8ef 100644 --- a/src/main/java/org/onap/dcae/restapi/VesRestController.java +++ b/src/main/java/org/onap/dcae/restapi/VesRestController.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,24 +21,24 @@ package org.onap.dcae.restapi; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import org.json.JSONObject; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.EventUpdater; import org.onap.dcae.common.HeaderUtils; -import org.onap.dcae.common.validator.GeneralEventValidator; -import org.onap.dcae.common.validator.StndDefinedDataValidator; -import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; import org.onap.dcae.common.model.StndDefinedNamespaceParameterHasEmptyValueException; import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcae.common.validator.GeneralEventValidator; +import org.onap.dcae.common.validator.StndDefinedDataValidator; import org.onap.dcaegen2.services.sdk.standardization.header.CustomHeaderUtils; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; @@ -50,8 +50,9 @@ import javax.servlet.http.HttpServletRequest; import java.util.List; import java.util.UUID; -import static org.springframework.http.ResponseEntity.accepted; +import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation; import static org.springframework.http.ResponseEntity.badRequest; +import static org.springframework.http.ResponseEntity.status; @RestController public class VesRestController { @@ -113,22 +114,29 @@ public class VesRestController { generalEventValidator.validate(vesEvent, type, version); List<VesEvent> vesEvents = transformEvent(vesEvent, type, version, requestURI); executeStndDefinedValidation(vesEvents); - eventSender.send(vesEvents); + executeBatchEventValidation(vesEvents); + HttpStatus httpStatus = eventSender.send(vesEvents); + return status(httpStatus).contentType(MediaType.APPLICATION_JSON).body("Successfully send event"); } catch (EventValidatorException e) { - logger.error(e.getMessage()); - return ResponseEntity.status(e.getApiException().httpStatusCode) + logger.error(e.getMessage()); + return status(e.getApiException().httpStatusCode) .body(e.getApiException().toJSON().toString()); } catch (StndDefinedNamespaceParameterNotDefinedException e) { - return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) + return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) .body(ApiException.MISSING_NAMESPACE_PARAMETER.toJSON().toString()); } catch (StndDefinedNamespaceParameterHasEmptyValueException e) { - return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) + return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode) .body(ApiException.EMPTY_NAMESPACE_PARAMETER.toJSON().toString()); + } catch (InternalException e) { + return status(ApiException.SERVICE_UNAVAILABLE.httpStatusCode) + .body(e.getApiException().toJSON().toString()); + } catch (PayloadToLargeException e) { + return status(ApiException.PAYLOAD_TO_LARGE.httpStatusCode) + .body(ApiException.PAYLOAD_TO_LARGE.toJSON().toString()); + } catch (BackwardsCompatibilityException e) { + return status(ApiException.INTERNAL_SERVER_ERROR.httpStatusCode) + .body(ApiException.INTERNAL_SERVER_ERROR.toJSON().toString()); } - - // TODO call service and return status, replace CambriaClient, split event to single object and list of them - return accepted().headers(this.headerUtils.fillHeaders(headerUtils.getRspCustomHeader())) - .contentType(MediaType.APPLICATION_JSON).body("Accepted"); } private void executeStndDefinedValidation(List<VesEvent> vesEvents) { @@ -142,7 +150,6 @@ public class VesRestController { headerUtils.extractHeaders(request), settings.getApiVersionDescriptionFilepath(), headerUtils.getRestApiIdentify(request.getRequestURI())); - } private List<VesEvent> transformEvent(VesEvent vesEvent, String type, String version, String requestURI) { @@ -151,13 +158,7 @@ public class VesRestController { private UUID generateUUID(VesEvent vesEvent, String version, String uri) { UUID uuid = UUID.randomUUID(); - setUpECOMPLoggingForRequest(uuid); requestLogger.info(String.format(VES_EVENT_MESSAGE, vesEvent.asJsonObject(), uuid, version, uri)); return uuid; } - - private static void setUpECOMPLoggingForRequest(UUID uuid) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - } -}
\ No newline at end of file +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 3f915d02..76d34118 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -47,7 +47,7 @@ </RollingFile> <RollingFile fileName="logs/eelf/audit.log" filePattern="logs/eelf/audit-%d{yyyy-MM-dd}-%i.log" name="EELF_AUDIT"> - <LevelRangeFilter maxLevel="DEBUG" minLevel="INFO"/> + <LevelRangeFilter maxLevel="TRACE" minLevel="INFO"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n"/> <Policies> <SizeBasedTriggeringPolicy size="64 MB"/> @@ -87,7 +87,7 @@ </Appenders> <Loggers> - <logger additivity="false" level="error" name="org.onap.dcaegen2.services.sdk"> + <logger additivity="true" level="trace" name="org.onap.dcaegen2.services.sdk"> <AppenderRef ref="ROL_CONSOLE"/> <AppenderRef ref="EFILE"/> </logger> diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java index 6ea94ab5..7acfdc27 100644 --- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java +++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 - 2020 Nokia. All rights reserved. + * Copyright (C) 2018 - 2021 Nokia. All rights reserved. * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.networknt.schema.JsonSchema; import io.vavr.collection.HashMap; import io.vavr.collection.Map; +import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -37,11 +38,10 @@ import java.util.Arrays; import java.util.Objects; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.onap.dcae.CLIUtils.processCmdLine; import static org.onap.dcae.TestingUtilities.createTemporaryFile; @@ -233,15 +233,6 @@ public class ApplicationSettingsTest { } @Test - public void shouldReturnDefaultDMAAPConfigFileLocation() throws IOException { - // when - String dmaapConfigFileLocation = fromTemporaryConfiguration().dMaaPConfigurationFileLocation(); - - // then - assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation); - } - - @Test public void shouldTellIfSchemaValidationIsEnabled() throws IOException { // when boolean jsonSchemaValidationEnabled = fromTemporaryConfiguration("collector.schema.checkflag=1") @@ -315,26 +306,26 @@ public class ApplicationSettingsTest { @Test public void shouldReturnDMAAPStreamId() throws IOException { // given - Map<String, String[]> expected = HashMap.of( - "log", new String[]{"ves-syslog", "ves-auditlog"}, - "fault", new String[]{"ves-fault"} + Map<String, String> expected = HashMap.of( + "log", "ves-syslog", + "fault", "ves-fault" ); // when - Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration( - "collector.dmaap.streamid=fault=ves-fault|log=ves-syslog,ves-auditlog") + Map<String, String> dmaapStreamID = fromTemporaryConfiguration( + "collector.dmaap.streamid=fault=ves-fault,stream1|log=ves-syslog,stream2,stream3") .getDmaapStreamIds(); // then - assertArrayEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get()); - assertArrayEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get()); + assertEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get()); + assertEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get()); assertEquals(expected.keySet(), dmaapStreamID.keySet()); } @Test public void shouldReturnDefaultDMAAPStreamId() throws IOException { // when - Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds(); + Map<String, String> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds(); // then assertEquals(dmaapStreamID, HashMap.empty()); @@ -391,24 +382,24 @@ public class ApplicationSettingsTest { } @Test - public void shouldReturnCambriaConfigurationFileLocation() throws IOException { + public void shouldReturnConfigurationFileLocation() throws IOException { // when - String cambriaConfigurationFileLocation = fromTemporaryConfiguration( - "collector.dmaapfile=/somewhere/dmaapConfig") + String configurationFileLocation = fromTemporaryConfiguration( + "collector.dmaapfile=/somewhere/data-formats/ves-dmaap-config.json") .dMaaPConfigurationFileLocation(); // then - assertEquals(sanitizePath("/somewhere/dmaapConfig"), cambriaConfigurationFileLocation); + assertEquals(sanitizePath("/somewhere/data-formats/ves-dmaap-config.json"), configurationFileLocation); } @Test - public void shouldReturnDefaultCambriaConfigurationFileLocation() throws IOException { + public void shouldReturnDefaultConfigurationFileLocation() throws IOException { // when - String cambriaConfigurationFileLocation = fromTemporaryConfiguration() + String configurationFileLocation = fromTemporaryConfiguration() .dMaaPConfigurationFileLocation(); // then - assertEquals(sanitizePath("etc/DmaapConfig.json"), cambriaConfigurationFileLocation); + assertEquals(sanitizePath("dpo/data-formats/ves-dmaap-config.json"), configurationFileLocation); } @Test diff --git a/src/test/java/org/onap/dcae/TLSTest.java b/src/test/java/org/onap/dcae/TLSTest.java index 424ddf8b..d33ae3ef 100644 --- a/src/test/java/org/onap/dcae/TLSTest.java +++ b/src/test/java/org/onap/dcae/TLSTest.java @@ -106,4 +106,4 @@ public class TLSTest extends TLSTestBase { when(settings.getExternalSchemaStndDefinedDataPath()).thenReturn(STND_DEFINED_DATA_PATH); } } -}
\ No newline at end of file +} diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java index 454cfb52..6d508d0a 100644 --- a/src/test/java/org/onap/dcae/common/EventSenderTest.java +++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java @@ -1,9 +1,9 @@ /* * ============LICENSE_START======================================================= - * PROJECT + * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.restapi.EventValidatorException; import java.io.IOException; import java.util.List; @@ -53,31 +54,18 @@ public class EventSenderTest { List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json"); // when - eventSender.send(eventToSend); + assertThatExceptionOfType(EventValidatorException.class) + .isThrownBy(() -> eventSender.send(eventToSend)); // then verifyThatEventWasNotSendAtStream(); } @Test - public void shouldSendEventAtStreamsAssignedToEventDomain() throws IOException { - // given - EventSender eventSender = givenConfiguredEventSender(HashMap.of("fault", new String[]{"ves-fault", "fault-ves"})); - List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json"); - - // when - eventSender.send(eventToSend); - - //then - verifyThatEventWasSendAtStream("ves-fault"); - verifyThatEventWasSendAtStream("fault-ves"); - } - - @Test public void shouldSendStdDefinedEventAtStreamAssignedToEventDomain() throws IOException { // given EventSender eventSender = givenConfiguredEventSender( - HashMap.of("3GPP-FaultSupervision", new String[]{"ves-3gpp-fault-supervision"}) + HashMap.of("3GPP-FaultSupervision", "ves-3gpp-fault-supervision") ); List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json"); @@ -95,7 +83,8 @@ public class EventSenderTest { List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json"); // when - eventSender.send(eventToSend); + assertThatExceptionOfType(EventValidatorException.class) + .isThrownBy(() -> eventSender.send(eventToSend)); // then verifyThatEventWasNotSendAtStream(); @@ -122,7 +111,7 @@ public class EventSenderTest { return givenEventToSend(event); } - private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String[]> streamIds) { + private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String> streamIds) { return new EventSender(eventPublisher, streamIds); } @@ -132,10 +121,10 @@ public class EventSenderTest { } private void verifyThatEventWasNotSendAtStream() { - verify(eventPublisher,never()).sendEvent(any(),any()); + verify(eventPublisher,never()).sendEvent(any(),any()); } private void verifyThatEventWasSendAtStream(String s) { - verify(eventPublisher).sendEvent(any(), eq(s)); - } + verify(eventPublisher).sendEvent(any(), eq(s)); + } } diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java index 923aae02..9aaeb287 100644 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java +++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,29 +56,6 @@ public class DMaaPConfigurationParserTest { assertThat(authCredentialsKeysMissing.isSecured()).isFalse(); } - - @Test - public void testParseCredentialsForLegacy() { - Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json"); - Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); - - PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull(); - assertThat(authCredentialsNull.userName().isEmpty()).isTrue(); - assertThat(authCredentialsNull.password().isEmpty()).isTrue(); - assertThat(authCredentialsNull.isSecured()).isFalse(); - - PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); - assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); - assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); - assertThat(authCredentialsPresent.isSecured()).isTrue(); - - PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); - assertThat(authCredentialsMissing.userName().isEmpty()).isTrue(); - assertThat(authCredentialsMissing.password().isEmpty()).isTrue(); - assertThat(authCredentialsMissing.isSecured()).isFalse(); - } - - @Test public void testParseGen2() { Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json"); @@ -93,22 +70,4 @@ public class DMaaPConfigurationParserTest { assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); } - @Test - public void testParseLegacy() { - Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json"); - Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser - .parseToDomainMapping(exemplaryConfig); - - PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull(); - assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904")); - assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); - - PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull(); - assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); - assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); - - PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull(); - assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); - assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); - } -}
\ No newline at end of file +} diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java deleted file mode 100644 index e4b6fd91..00000000 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018,2020 Nokia. All rights reserved. - * Copyright (C) 2020 AT&T. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import org.json.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.onap.dcae.common.model.VesEvent; - -import java.io.IOException; - -import static io.vavr.API.Option; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class DMaaPEventPublisherTest { - - private static final String STREAM_ID = "sampleStreamId"; - - private static final JSONObject EXPECTED_EVENT = - new JSONObject( - "{\"VESversion\":\"v7\",\"event\":{" - + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," - + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," - + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," - + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," - + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3," - + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\"," - + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," - + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); - - private static final String PARTITION = "dns01cmd004"; - - private DMaaPEventPublisher eventPublisher; - private CambriaBatchingPublisher cambriaPublisher; - private DMaaPPublishersCache DMaaPPublishersCache; - - @Before - public void setUp() { - cambriaPublisher = mock(CambriaBatchingPublisher.class); - DMaaPPublishersCache = mock(DMaaPPublishersCache.class); - when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); - eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache); - } - - @Test - public void shouldSendEventToTopic() throws Exception { - // when - eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID); - - // then - verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString()); - } - - @Test - public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { - // when - eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID); - - // then - verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString()); - } - - @Test - public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { - // given - given(cambriaPublisher.send(anyString(), anyString())) - .willThrow(new IOException("Expected exception - test case scenario!")); - - // when - eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID); - - // then - verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); - } - - private VesEvent givenVesEventWithVESUniqueIdField() { - return new VesEvent( - new JSONObject( - "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," + - "\"event\":{" + - "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," + - "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + - "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," + - "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," + - "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," + - "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," + - "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}")); - } - - private VesEvent givenVesEventWithoutVESuniqueIdField() { - return new VesEvent( - new JSONObject( - "{\"VESversion\":\"v7\"," + - "\"event\":{" + - "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," + - "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + - "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," + - "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," + - "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," + - "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," + - "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}")); - } -} diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java deleted file mode 100644 index f4dbe190..00000000 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import static io.vavr.API.List; -import static io.vavr.API.Map; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader; -import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener; - - -public class DMaaPPublishersCacheTest { - - private String streamId1; - private Map<String, PublisherConfig> dMaaPConfigs; - - @Before - public void setUp() { - streamId1 = "sampleStream1"; - dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1")); - } - - @Test - public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() { - // given - DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); - - // when - Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1); - Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1); - - // then - assertSame("should return same instance", firstPublisher.get(), secondPublisher.get()); - } - - @Test - public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException { - // given - CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); - CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); - DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, - new OnPublisherRemovalListener(), - dMaaPConfigs); - when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1); - - // when - dMaaPPublishersCache.getPublisher(streamId1); - dMaaPPublishersCache.closePublisherFor(streamId1); - - // then - verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS); - - } - - @Test - public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() { - // given - DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); - - // then - assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty()); - } - - - @Test - public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException { - // given - CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); - CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class); - CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); - String firstDomain = "domain1"; - String secondDomain = "domain2"; - Map<String, PublisherConfig> oldConfig = Map(firstDomain, - new PublisherConfig(List("destination1"), "topic1"), - secondDomain, - new PublisherConfig(List("destination2"), "topic2", - "user", "pass")); - Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"), - secondDomain, new PublisherConfig(List("destination2"), "topic2")); - DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, - new OnPublisherRemovalListener(), - oldConfig); - when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1); - when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2); - - dMaaPPublishersCache.getPublisher(firstDomain); - dMaaPPublishersCache.getPublisher(secondDomain); - - // when - dMaaPPublishersCache.reconfigure(newConfig); - - // then - verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS); - verifyZeroInteractions(cambriaPublisherMock1); - } -}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java new file mode 100644 index 00000000..9ece10b5 --- /dev/null +++ b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START==================================== + * VES Collector + * ========================================================= + * Copyright (C) 2019-2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcae.common.publishing; + +import org.testcontainers.containers.DockerComposeContainer; + +import java.io.File; +import java.net.URL; + +final class DMaapContainer { + private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml"; + private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME); + static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; + static final String DMAAP_SERVICE_NAME = "onap-dmaap"; + + private DMaapContainer() {} + + + public static DockerComposeContainer createContainerInstance(){ + return new DockerComposeContainer( + new File(DOCKER_COMPOSE_FILE_PATH)) + .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT) + .withLocalCompose(true); + } + + + + private static String getDockerComposeFilePath(String resourceName) { + URL resource = DMaapContainer.class.getClassLoader() + .getResource(resourceName); + + if (resource != null) return resource.getFile(); + else throw new RuntimeException(String + .format("File %s does not exist", resourceName)); + } +} diff --git a/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java new file mode 100644 index 00000000..0e5ae908 --- /dev/null +++ b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java @@ -0,0 +1,116 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.onap.dcae.common.model.BackwardsCompatibilityException; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.springframework.http.HttpStatus; + +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.onap.dcae.ApplicationSettings.responseCompatibility; +import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; + +class MessageRouterHttpStatusMapperTest { + + public static final String BACKWARDS_COMPATIBILITY = "v7.2"; + public static final String BACKWARDS_COMPATIBILITY_NONE = "NONE"; + + @Test + void ves_shouldResponse202() { + //given + responseCompatibility = BACKWARDS_COMPATIBILITY; + MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class); + when(messageRouterPublishResponse.successful()).thenReturn(true); + + //when + HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse); + + //then + assertSame(HttpStatus.ACCEPTED, httpStatusResponse); + } + + @ParameterizedTest + @EnumSource( + value = HttpStatus.class, + names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY", + "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT","PAYLOAD_TOO_LARGE"} + ) + void ves_shouldMapErrorsToBackwardsCompatibility(HttpStatus httpStatus) { + //given + responseCompatibility = BACKWARDS_COMPATIBILITY; + MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class); + when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString()); + + //when + //then + assertThrows(BackwardsCompatibilityException.class,()->getHttpStatus(messageRouterPublishResponse)); + } + + @Test + void ves_shouldResponse200WhenBackwardsCompatibilityIsNone() { + //given + responseCompatibility = BACKWARDS_COMPATIBILITY_NONE; + MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class); + when(messageRouterPublishResponse.successful()).thenReturn(true); + + //when + HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse); + + //then + assertSame(HttpStatus.OK, httpStatusResponse); + } + + @Test + void ves_shouldHandleError413WhenBackwardsCompatibilityIsNone() { + //given + responseCompatibility = BACKWARDS_COMPATIBILITY_NONE; + MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class); + when(messageRouterPublishResponse.failReason()).thenReturn(HttpStatus.PAYLOAD_TOO_LARGE.toString()); + + //when + //then + assertThrows(PayloadToLargeException.class,()->getHttpStatus(messageRouterPublishResponse)); + } + + @ParameterizedTest + @EnumSource( + value = HttpStatus.class, + names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY", + "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT"} + ) + void ves_shouldMapErrorsTo503WhenBackwardsCompatibilityIsNone(HttpStatus httpStatus) { + //given + responseCompatibility = BACKWARDS_COMPATIBILITY_NONE; + MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class); + when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString()); + + //when + //then + assertThrows(InternalException.class,()->getHttpStatus(messageRouterPublishResponse)); + } +} diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java new file mode 100644 index 00000000..f269b942 --- /dev/null +++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.time.Duration; + +import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements; + + +@Testcontainers +public class PublisherTest { + + @Container + private final DockerComposeContainer CONTAINER = createContainerInstance(); + + @Test + void publishEvent_shouldSuccessfullyPublishSingleMessage() { + //given + final Publisher publisher = new Publisher(); + final String simpleEvent = "{\"message\":\"message1\"}"; + final List<String> twoJsonMessages = List.of(simpleEvent); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages)); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig()); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + + + private Option<PublisherConfig> createPublishConfig() { + List<String> desc = List.of("127.0.0.1:3904"); + PublisherConfig conf = new PublisherConfig(desc, "topic"); + return Option.of(conf); + } + + private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) { + return ImmutableMessageRouterPublishResponse + .builder() + .items(items) + .build(); + } + +} diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java new file mode 100644 index 00000000..dbecd531 --- /dev/null +++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java @@ -0,0 +1,156 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + + +import com.google.gson.JsonElement; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.junit.jupiter.MockServerExtension; +import org.mockserver.junit.jupiter.MockServerSettings; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements; + +@ExtendWith(MockServerExtension.class) +@MockServerSettings(ports = {1080, 8888}) +class PublisherTestMockServer { + + private static final int MAX_IDLE_TIME = 10; + private static final int MAX_LIFE_TIME = 20; + private static final int CONNECTION_POOL = 1; + private static final String TOPIC = "TOPIC10"; + private static final String PATH = String.format("/events/%s/", TOPIC); + + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + + "{" + + "\"requestError\":" + + "{" + + "\"serviceException\":" + + "{" + + "\"messageId\":\"SVC0001\"," + + "\"text\":\"Client timeout exception occurred, Error code is %1\"," + + "\"variables\":[\"408\"]" + + "}" + + "}" + + "}"; + + private final ClientAndServer client; + + public PublisherTestMockServer(ClientAndServer client) { + this.client = client; + } + + @Test + void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { + //given + final Long timeoutSec = 1L; + final Publisher publisher = new Publisher(connectionPoolConfiguration()); + final String simpleEvent = "{\"message\":\"message1\"}"; + final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE); + + final String path = String.format("/events/%s/", TOPIC); + client.when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); + List<String> events = List.of(simpleEvent); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(events, createPublishRequest(createPublishConfig(), timeoutSec)); + + + + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + //then + client.verify(request().withPath(path), VerificationTimes.exactly(1)); + + } + + @Test + void publishEvent_shouldSuccessfullyPublishSingleMessage() { + //given + final Publisher publisher = new Publisher(); + final String simpleEvent = "{\"message\":\"message1\"}"; + final List<String> twoJsonMessages = List.of(simpleEvent); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages)); + client.when(request().withPath(PATH), Times.once()) + .respond(response()); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(List.of(simpleEvent), createPublishConfig()); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + + private Option<PublisherConfig> createPublishConfig() { + List<String> desc = List.of("localhost:1080"); + PublisherConfig conf = new PublisherConfig(desc, TOPIC); + return Option.of(conf); + } + + private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) { + return ImmutableMessageRouterPublishResponse + .builder() + .items(items) + .build(); + } + + public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) { + String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs); + return ImmutableMessageRouterPublishResponse + .builder() + .failReason(failReason) + .build(); + } + + public MessageRouterPublisherConfig connectionPoolConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() + .connectionPool(CONNECTION_POOL) + .maxIdleTime(MAX_IDLE_TIME) + .maxLifeTime(MAX_LIFE_TIME) + .build()) + .build(); + } +} diff --git a/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java new file mode 100644 index 00000000..05baa04b --- /dev/null +++ b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.validator; + +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.EventUpdater; +import org.onap.dcae.common.model.VesEvent; +import org.onap.dcae.restapi.EventValidatorException; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation; + +class BatchEventValidatorTest { + + private final ApplicationSettings settings = mock(ApplicationSettings.class); + private final EventUpdater eventUpdater = new EventUpdater(settings); + private static final String EVENT = "event"; + private static final String EVENT_LIST = "eventList"; + + @Test + void shouldThrowException_whenDomainFieldsHaveDifferentValues() throws IOException { + //given + final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid_two_different_domain.json", EVENT_LIST); + + //when + //then + assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList)); + } + + @Test + void shouldNotThrowException_whenDomainFieldsHaveSameValues() throws IOException { + //given + final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid.json", EVENT_LIST); + + //when + //then + assertDoesNotThrow(() -> executeBatchEventValidation(eventList)); + } + + @Test + void shouldThrowException_whenStndDefinedNamespaceFieldsHaveDifferentValuesAndDomainsAreStndDefined() throws IOException { + //given + final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json", EVENT_LIST); + + //when + //then + assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList)); + } + + @Test + void shouldNotThrowException_whenStndDefinedNamespaceFieldsHaveSameValuesAndDomainsAreStndDefined() throws IOException { + //given + final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json", EVENT_LIST); + + //when + //then + assertDoesNotThrow(() -> executeBatchEventValidation(eventList)); + } + + @Test + void shouldNotThrowException_whenSendValidNotBatchEvent() throws IOException { + //given + final List<VesEvent> eventList = prepareEventList("src/test/resources/ves_stdnDefined_valid.json", EVENT); + + //when + //then + assertDoesNotThrow(() -> executeBatchEventValidation(eventList)); + } + + private List<VesEvent> prepareEventList(String pathToFile, String eventType) throws IOException { + final VesEvent vesEventFromJson = createVesEventFromJson(pathToFile); + return eventUpdater.convert(vesEventFromJson, "v7", UUID.randomUUID(), eventType); + } + + private VesEvent createVesEventFromJson(String pathToFile) throws IOException { + Path path = Paths.get(pathToFile); + final List<String> lines = Files.readAllLines(path); + String str = String.join("", lines); + return new VesEvent(new JSONObject(str)); + } + +} diff --git a/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java b/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java new file mode 100644 index 00000000..d085eb13 --- /dev/null +++ b/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2021 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.multiplestreamreducer; + +import io.vavr.collection.HashMap; +import io.vavr.collection.Map; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class MultipleStreamReducerTest { + + private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer(); + private final Map<String, String[]> domainToStreams = HashMap.of( + "fault", new String[]{"ves-fault", "stream1", "stream2"}, + "log", new String[]{"ves-syslog", "stream3", "stream4", "stream5"}, + "test", new String[]{"stream6"} + ); + + @Test + void shouldReduceStreamsToTheFirstOne() { + //given + Map<String, String> expected = HashMap.of( + "fault", "ves-fault", + "log", "ves-syslog", + "test", "stream6" + ); + + //when + final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams); + + //then + assertEquals(expected, domainToStreamsAfterReduce); + } + + @Test + void shouldReturnInfoAboutDomainToStreamsConfig() { + //given + final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams); + String expectedRedundantStreamsInfo = + "Domain: fault has active stream: ves-fault\n" + + "Domain: log has active stream: ves-syslog\n" + + "Domain: test has active stream: stream6\n"; + + //when + final String domainToStreamsConfigInfo = multipleStreamReducer.getDomainToStreamsInfo(domainToStreamsAfterReduce); + + //then + assertEquals(expectedRedundantStreamsInfo, domainToStreamsConfigInfo); + } + +} diff --git a/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java b/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java index 9df0c694..931e7bc3 100644 --- a/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java +++ b/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java @@ -143,4 +143,4 @@ public class ApiAuthInterceptionTest { healthcheckRequest.setServerPort(serverPort); return healthcheckRequest; } -}
\ No newline at end of file +} diff --git a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java index a3c0628d..9b436871 100644 --- a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java +++ b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java @@ -1,8 +1,8 @@ /* * ============LICENSE_START======================================================= - * PROJECT + * VES Collector * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved.s + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,24 +25,28 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.networknt.schema.JsonSchema; import io.vavr.collection.HashMap; -import org.apache.http.HttpStatus; import org.jetbrains.annotations.NotNull; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.JSonSchemasSupplier; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.EventTransformation; import org.onap.dcae.common.HeaderUtils; import org.onap.dcae.common.JsonDataLoader; -import org.onap.dcae.common.model.VesEvent; -import org.onap.dcae.common.validator.StndDefinedDataValidator; +import org.onap.dcae.common.model.InternalException; +import org.onap.dcae.common.model.PayloadToLargeException; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.common.validator.StndDefinedDataValidator; import org.slf4j.Logger; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.mock.web.MockHttpServletRequest; import org.springframework.web.context.request.RequestContextHolder; @@ -53,8 +57,10 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -63,14 +69,15 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) + +@ExtendWith(MockitoExtension.class) public class VesRestControllerTest { private static final String EVENT_TRANSFORM_FILE_PATH = "/eventTransform.json"; - private static final String ACCEPTED = "Accepted"; + private static final String ACCEPTED = "Successfully send event"; private static final String VERSION_V7 = "v7"; - public static final String VES_FAULT_TOPIC = "ves-fault"; - public static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision"; + static final String VES_FAULT_TOPIC = "ves-fault"; + static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision"; private VesRestController vesRestController; @@ -92,18 +99,18 @@ public class VesRestControllerTest { @Mock private StndDefinedDataValidator stndDefinedDataValidator; - @Before - public void setUp(){ - final HashMap<String, String[]> streamIds = HashMap.of( - "fault", new String[]{VES_FAULT_TOPIC}, - "3GPP-FaultSupervision", new String[]{VES_3_GPP_FAULT_SUPERVISION_TOPIC} + @BeforeEach + void setUp(){ + final HashMap<String, String> streamIds = HashMap.of( + "fault", VES_FAULT_TOPIC, + "3GPP-FaultSupervision", VES_3_GPP_FAULT_SUPERVISION_TOPIC ); this.vesRestController = new VesRestController(applicationSettings, logger, errorLogger, new EventSender(eventPublisher, streamIds), headerUtils, stndDefinedDataValidator); } @Test - public void shouldReportThatApiVersionIsNotSupported() { + void shouldReportThatApiVersionIsNotSupported() { // given when(applicationSettings.isVersionSupported("v20")).thenReturn(false); MockHttpServletRequest request = givenMockHttpServletRequest(); @@ -112,33 +119,33 @@ public class VesRestControllerTest { final ResponseEntity<String> event = vesRestController.event("", "v20", request); // then - assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST); + assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value()); assertThat(event.getBody()).isEqualTo("API version v20 is not supported"); verifyThatEventWasNotSend(); } @Test - public void shouldTransformEventAccordingToEventTransformFile() throws IOException { + void shouldTransformEventAccordingToEventTransformFile() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); MockHttpServletRequest request = givenMockHttpServletRequest(); - String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json"); + when(eventPublisher.sendEvent(any(), any())).thenReturn((HttpStatus.OK)); //when final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getBody()).isEqualTo(ACCEPTED); verifyThatTransformedEventWasSend(eventPublisher, validEvent); } @Test - public void shouldSendBatchOfEvents() throws IOException { + void shouldSendBatchEvent() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -146,18 +153,18 @@ public class VesRestControllerTest { MockHttpServletRequest request = givenMockHttpServletRequest(); String validEvent = JsonDataLoader.loadContent("/ves7_batch_valid.json"); - + when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK); //when final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getBody()).isEqualTo(ACCEPTED); - verify(eventPublisher, times(2)).sendEvent(any(),any()); + verify(eventPublisher, times(1)).sendEvent(any(),any()); } @Test - public void shouldSendStndDomainEventIntoDomainStream() throws IOException { + void shouldSendStndDomainEventIntoDomainStream() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -166,19 +173,20 @@ public class VesRestControllerTest { configureSchemasSupplierForStndDefineEvent(); String validEvent = JsonDataLoader.loadContent("/ves_stdnDefined_valid.json"); + when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK); //when final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getBody()).isEqualTo(ACCEPTED); verify(eventPublisher).sendEvent(any(),eq(VES_3_GPP_FAULT_SUPERVISION_TOPIC)); } @Test - public void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException { + void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -192,7 +200,7 @@ public class VesRestControllerTest { final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value()); verifyErrorResponse( response, "SVC2006", @@ -203,7 +211,7 @@ public class VesRestControllerTest { } @Test - public void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException { + void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -217,7 +225,7 @@ public class VesRestControllerTest { final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value()); verifyErrorResponse( response, "SVC2006", @@ -228,7 +236,7 @@ public class VesRestControllerTest { } @Test - public void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException { + void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -240,13 +248,12 @@ public class VesRestControllerTest { final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); - assertThat(response.getBody()).isEqualTo(ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value()); verifyThatEventWasNotSend(); } @Test - public void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException { + void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -254,18 +261,18 @@ public class VesRestControllerTest { MockHttpServletRequest request = givenMockHttpServletRequest(); String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json"); when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(true); - + when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK); //when final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getBody()).isEqualTo(ACCEPTED); verify(stndDefinedDataValidator, times(2)).validate(any()); } @Test - public void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException { + void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException { //given configureEventTransformations(); configureHeadersForEventListener(); @@ -273,16 +280,76 @@ public class VesRestControllerTest { MockHttpServletRequest request = givenMockHttpServletRequest(); String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json"); when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(false); + when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK); //when final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request); //then - assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED); + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getBody()).isEqualTo(ACCEPTED); verify(stndDefinedDataValidator, times(0)).validate(any()); } + @Test + void shouldReturn413WhenPayloadIsTooLarge() throws IOException { + //given + configureEventTransformations(); + configureHeadersForEventListener(); + + MockHttpServletRequest request = givenMockHttpServletRequest(); + when(eventPublisher.sendEvent(any(), any())).thenThrow(new PayloadToLargeException()); + String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json"); + + //when + final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); + + //then + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.PAYLOAD_TOO_LARGE.value()); + verifyErrorResponse( + response, + "SVC2000", + "The following service error occurred: %1. Error code is %2", + List.of("Request Entity Too Large","413") + ); + } + + @ParameterizedTest + @MethodSource("errorsCodeAndResponseBody") + void shouldMapErrorTo503AndReturnOriginalBody(ApiException apiException,String bodyVariable,String bodyVariable2) throws IOException { + //given + configureEventTransformations(); + configureHeadersForEventListener(); + + MockHttpServletRequest request = givenMockHttpServletRequest(); + when(eventPublisher.sendEvent(any(), any())).thenThrow(new InternalException(apiException)); + String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json"); + + //when + final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request); + + //then + assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE.value()); + verifyErrorResponse( + response, + "SVC2000", + "The following service error occurred: %1. Error code is %2", + List.of(bodyVariable,bodyVariable2) + ); + } + + private static Stream<Arguments> errorsCodeAndResponseBody() { + return Stream.of( + arguments(ApiException.NOT_FOUND, "Not Found","404"), + arguments(ApiException.REQUEST_TIMEOUT, "Request Timeout","408"), + arguments(ApiException.TOO_MANY_REQUESTS, "Too Many Requests","429"), + arguments(ApiException.INTERNAL_SERVER_ERROR, "Internal Server Error","500"), + arguments(ApiException.BAD_GATEWAY, "Bad Gateway","502"), + arguments(ApiException.SERVICE_UNAVAILABLE, "Service Unavailable","503"), + arguments(ApiException.GATEWAY_TIMEOUT, "Gateway Timeout","504") + ); + } + private void verifyThatEventWasNotSend() { verify(eventPublisher, never()).sendEvent(any(), any()); } @@ -313,7 +380,7 @@ public class VesRestControllerTest { final List<EventTransformation> eventTransformations = loadEventTransformations(); when(applicationSettings.isVersionSupported(VERSION_V7)).thenReturn(true); when(applicationSettings.eventTransformingEnabled()).thenReturn(true); - when(applicationSettings.getEventTransformations()).thenReturn(eventTransformations); + when(applicationSettings.getEventTransformations()).thenReturn((eventTransformations)); } private void configureHeadersForEventListener() { @@ -326,11 +393,11 @@ public class VesRestControllerTest { assertThat(eventBeforeTransformation).contains("\"version\": \"4.0.1\""); assertThat(eventBeforeTransformation).contains("\"faultFieldsVersion\": \"4.0\""); - ArgumentCaptor<VesEvent> argument = ArgumentCaptor.forClass(VesEvent.class); + ArgumentCaptor<List> argument = ArgumentCaptor.forClass(List.class); ArgumentCaptor<String> domain = ArgumentCaptor.forClass(String.class); verify(eventPublisher).sendEvent(argument.capture(), domain.capture()); - final String transformedEvent = argument.getValue().asJsonObject().toString(); + final String transformedEvent = argument.getValue().toString(); final String eventSentAtTopic = domain.getValue(); // event after transformation diff --git a/src/test/java/org/onap/dcae/vestest/TestVESLogger.java b/src/test/java/org/onap/dcae/vestest/TestVESLogger.java deleted file mode 100644 index 1689263e..00000000 --- a/src/test/java/org/onap/dcae/vestest/TestVESLogger.java +++ /dev/null @@ -1,76 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * PROJECT - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.vestest; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.onap.dcae.common.VESLogger.REQUEST_ID; - -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; -import java.util.UUID; -import org.junit.Test; -import org.onap.dcae.common.VESLogger; - -public class TestVESLogger { - - @Test - public void shouldOnLoggingContextInitializationPutRandomUuidAsRequestId() { - LoggingContext commonLoggingContext = VESLogger.getCommonLoggingContext(); - String requestId = commonLoggingContext.get(REQUEST_ID, "default"); - - assertNotNull(requestId); - assertNotSame(requestId, "default"); - - } - - @Test - public void shouldOnLoggingContextInitializationPutGivenUuuidAsRequestIdAndSupplyEndTimestamp() { - final UUID uuid = UUID.randomUUID(); - LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid); - String requestId = loggingContextForThread.get(REQUEST_ID, "default"); - String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default"); - - assertNotNull(requestId); - assertNotNull(endTimestamp); - assertNotSame(endTimestamp, "default"); - assertEquals(requestId, uuid.toString()); - } - - @Test - public void shouldOnLoggingContextInitializationPutGivenUuidAsRequestIdAndSupplyEndTimestampAndCompleteStatusCode() { - final UUID uuid = UUID.randomUUID(); - LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid.toString()); - String requestId = loggingContextForThread.get(REQUEST_ID, "default"); - String statusCode = loggingContextForThread.get("statusCode", "default"); - String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default"); - - assertNotNull(requestId); - assertNotNull(endTimestamp); - assertNotNull(statusCode); - assertNotSame(endTimestamp, "default"); - assertEquals(requestId, uuid.toString()); - assertEquals(statusCode, "COMPLETE"); - } - -} - diff --git a/src/test/resources/dmaap-msg-router/MsgRtrApi.properties b/src/test/resources/dmaap-msg-router/MsgRtrApi.properties new file mode 100644 index 00000000..d288bd23 --- /dev/null +++ b/src/test/resources/dmaap-msg-router/MsgRtrApi.properties @@ -0,0 +1,155 @@ +# LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +############################################################################### +############################################################################### +## +## Cambria API Server config +## +## Default values are shown as commented settings. +## +############################################################################### +## +## HTTP service +## +## 3904 is standard as of 7/29/14. +# +## Zookeeper Connection +## +## Both Cambria and Kafka make use of Zookeeper. +## +#config.zk.servers=172.18.1.1 +#config.zk.servers={{.Values.zookeeper.name}}:{{.Values.zookeeper.port}} +config.zk.servers=zookeeper +#config.zk.root=/fe3c/cambria/config +############################################################################### +## +## Kafka Connection +## +## Items below are passed through to Kafka's producer and consumer +## configurations (after removing "kafka.") +## if you want to change request.required.acks it can take this one value +#kafka.metadata.broker.list=localhost:9092,localhost:9093 +#kafka.metadata.broker.list={{.Values.kafka.name}}:{{.Values.kafka.port}} +kafka.metadata.broker.list=kafka:9092 +##kafka.request.required.acks=-1 +#kafka.client.zookeeper=${config.zk.servers} +consumer.timeout.ms=100 +zookeeper.connection.timeout.ms=6000 +zookeeper.session.timeout.ms=20000 +zookeeper.sync.time.ms=2000 +auto.commit.interval.ms=1000 +fetch.message.max.bytes=1000000 +auto.commit.enable=false +#(backoff*retries > zksessiontimeout) +kafka.rebalance.backoff.ms=10000 +kafka.rebalance.max.retries=6 +############################################################################### +## +## Secured Config +## +## Some data stored in the config system is sensitive -- API keys and secrets, +## for example. to protect it, we use an encryption layer for this section +## of the config. +## +## The key is a base64 encode AES key. This must be created/configured for +## each installation. +#cambria.secureConfig.key= +## +## The initialization vector is a 16 byte value specific to the secured store. +## This must be created/configured for each installation. +#cambria.secureConfig.iv= +## Southfield Sandbox +cambria.secureConfig.key=b/7ouTn9FfEw2PQwL0ov/Q== +cambria.secureConfig.iv=wR9xP5k5vbz/xD0LmtqQLw== +authentication.adminSecret=fe3cCompound +#cambria.secureConfig.key[pc569h]=YT3XPyxEmKCTLI2NK+Sjbw== +#cambria.secureConfig.iv[pc569h]=rMm2jhR3yVnU+u2V9Ugu3Q== +############################################################################### +## +## Consumer Caching +## +## Kafka expects live connections from the consumer to the broker, which +## obviously doesn't work over connectionless HTTP requests. The Cambria +## server proxies HTTP requests into Kafka consumer sessions that are kept +## around for later re-use. Not doing so is costly for setup per request, +## which would substantially impact a high volume consumer's performance. +## +## This complicates Cambria server failover, because we often need server +## A to close its connection before server B brings up the replacement. +## +## The consumer cache is normally enabled. +#cambria.consumer.cache.enabled=true +## Cached consumers are cleaned up after a period of disuse. The server inspects +## consumers every sweepFreqSeconds and will clean up any connections that are +## dormant for touchFreqMs. +#cambria.consumer.cache.sweepFreqSeconds=15 +cambria.consumer.cache.touchFreqMs=120000 +##stickforallconsumerrequests=false +## The cache is managed through ZK. The default value for the ZK connection +## string is the same as config.zk.servers. +#cambria.consumer.cache.zkConnect=${config.zk.servers} + +## +## Shared cache information is associated with this node's name. The default +## name is the hostname plus the HTTP service port this host runs on. (The +## hostname is determined via InetAddress.getLocalHost ().getCanonicalHostName(), +## which is not always adequate.) You can set this value explicitly here. +## +#cambria.api.node.identifier=<use-something-unique-to-this-instance> + +#cambria.rateLimit.maxEmptyPollsPerMinute=30 +#cambria.rateLimitActual.delay.ms=10 +############################################################################### +## +## Metrics Reporting +## +## This server can report its metrics periodically on a topic. +## +#metrics.send.cambria.enabled=true +#metrics.send.cambria.topic=cambria.apinode.metrics #msgrtr.apinode.metrics.dmaap +#metrics.send.cambria.sendEverySeconds=60 +cambria.consumer.cache.zkBasePath=/fe3c/cambria/consumerCache +consumer.timeout=17 +default.partitions=3 +default.replicas=3 +############################################################################## +#100mb +maxcontentlength=10000 +############################################################################## +#AAF Properties +msgRtr.namespace.aaf=org.onap.dmaap.mr.topic +msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: +enforced.topic.name.AAF=org.onap.dmaap.mr +forceAAF=false +transidUEBtopicreqd=false +defaultNSforUEB=org.onap.dmaap.mr +############################################################################## +#Mirror Maker Agent +msgRtr.mirrormakeradmin.aaf=org.onap.dmaap.mr.mirrormaker|*|admin +msgRtr.mirrormakeruser.aaf=org.onap.dmaap.mr.mirrormaker|*|user +msgRtr.mirrormakeruser.aaf.create=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: +msgRtr.mirrormaker.timeout=15000 +msgRtr.mirrormaker.topic=org.onap.dmaap.mr.mirrormakeragent +msgRtr.mirrormaker.consumergroup=mmagentserver +msgRtr.mirrormaker.consumerid=1 +kafka.max.poll.interval.ms=300000 +kafka.heartbeat.interval.ms=60000 +kafka.session.timeout.ms=240000 +kafka.max.poll.records=1000 diff --git a/src/test/resources/dmaap-msg-router/cadi.properties b/src/test/resources/dmaap-msg-router/cadi.properties new file mode 100644 index 00000000..f2a3cdc9 --- /dev/null +++ b/src/test/resources/dmaap-msg-router/cadi.properties @@ -0,0 +1,18 @@ +aaf_url=https://AAF_LOCATE_URL/onap.org.osaaf.aaf.service:2.1 +aaf_env=DEV +aaf_lur=org.onap.aaf.cadi.aaf.v2_0.AAFLurPerm + +cadi_truststore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.trust.jks +cadi_truststore_password=8FyfX+ar;0$uZQ0h9*oXchNX + +cadi_keyfile=/appl/dmaapMR1/etc/org.onap.dmaap.mr.keyfile + +cadi_alias=dmaapmr@mr.dmaap.onap.org +cadi_keystore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.p12 +cadi_keystore_password=GDQttV7)BlOvWMf6F7tz&cjy +cadi_x509_issuers=CN=intermediateCA_1, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_7, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_9, OU=OSAAF, O=ONAP, C=US + +cadi_loglevel=INFO +cadi_protocols=TLSv1.1,TLSv1.2 +cadi_latitude=37.78187 +cadi_longitude=-122.26147 diff --git a/src/test/resources/dmaap-msg-router/logback.xml b/src/test/resources/dmaap-msg-router/logback.xml new file mode 100644 index 00000000..a39d9e47 --- /dev/null +++ b/src/test/resources/dmaap-msg-router/logback.xml @@ -0,0 +1,207 @@ +<!-- + ============LICENSE_START======================================================= + Copyright © 2019 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<configuration scan="true" scanPeriod="3 seconds" debug="false"> + <contextName>${module.ajsc.namespace.name}</contextName> + <jmxConfigurator/> + <property name="logDirectory" value="${AJSC_HOME}/log"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.LevelFilter"> + <level>ERROR</level> + <onMatch>ACCEPT</onMatch> + <onMismatch>DENY</onMismatch> + </filter> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{1024} - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="INFO" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.LevelFilter"> + <level>INFO</level> + <onMatch>ACCEPT</onMatch> + <onMismatch>DENY</onMismatch> + </filter> + </appender> + + <appender name="DEBUG" class="ch.qos.logback.core.ConsoleAppender"> + + <encoder> + <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern> + </encoder> + </appender> + + <appender name="ERROR" class="ch.qos.logback.core.ConsoleAppender">class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.LevelFilter"> + <level>ERROR</level> + <onMatch>ACCEPT</onMatch> + <onMismatch>DENY</onMismatch> + </filter> + <encoder> + <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern> + </encoder> + </appender> + + + <!-- Msgrtr related loggers --> + <logger name="org.onap.dmaap.dmf.mr.service" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.service.impl" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.resources" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.resources.streamReaders" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.backends" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.backends.kafka" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.backends.memory" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.beans" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.constants" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.exception" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.listener" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.metrics.publisher" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.metrics.publisher.impl" level="INFO"/> + + + <logger name="org.onap.dmaap.dmf.mr.security" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.security.impl" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.transaction" level="INFO"/> + <logger name="com.att.dmf.mr.transaction.impl" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/> + <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/> + + <logger name="org.onap.dmaap.dmf.mr.utils" level="INFO"/> + <logger name="org.onap.dmaap.mr.filter" level="INFO"/> + + <!--<logger name="com.att.nsa.cambria.*" level="INFO" />--> + + <!-- Msgrtr loggers in ajsc --> + <logger name="org.onap.dmaap.service" level="INFO"/> + <logger name="org.onap.dmaap" level="INFO"/> + + + <!-- Spring related loggers --> + <logger name="org.springframework" level="WARN" additivity="false"/> + <logger name="org.springframework.beans" level="WARN" additivity="false"/> + <logger name="org.springframework.web" level="WARN" additivity="false"/> + <logger name="com.blog.spring.jms" level="WARN" additivity="false"/> + + <!-- AJSC Services (bootstrap services) --> + <logger name="ajsc" level="WARN" additivity="false"/> + <logger name="ajsc.RouteMgmtService" level="INFO" additivity="false"/> + <logger name="ajsc.ComputeService" level="INFO" additivity="false"/> + <logger name="ajsc.VandelayService" level="WARN" additivity="false"/> + <logger name="ajsc.FilePersistenceService" level="WARN" additivity="false"/> + <logger name="ajsc.UserDefinedJarService" level="WARN" additivity="false"/> + <logger name="ajsc.UserDefinedBeansDefService" level="WARN" additivity="false"/> + <logger name="ajsc.LoggingConfigurationService" level="WARN" additivity="false"/> + + <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet + logging) --> + <logger name="ajsc.utils" level="WARN" additivity="false"/> + <logger name="ajsc.utils.DME2Helper" level="INFO" additivity="false"/> + <logger name="ajsc.filters" level="DEBUG" additivity="false"/> + <logger name="ajsc.beans.interceptors" level="DEBUG" additivity="false"/> + <logger name="ajsc.restlet" level="DEBUG" additivity="false"/> + <logger name="ajsc.servlet" level="DEBUG" additivity="false"/> + <logger name="com.att" level="WARN" additivity="false"/> + <logger name="com.att.ajsc.csi.logging" level="WARN" additivity="false"/> + <logger name="com.att.ajsc.filemonitor" level="WARN" additivity="false"/> + + <logger name="com.att.nsa.dmaap.util" level="INFO" additivity="false"/> + <logger name="com.att.cadi.filter" level="INFO" additivity="false"/> + + + <!-- Other Loggers that may help troubleshoot --> + <logger name="net.sf" level="WARN" additivity="false"/> + <logger name="org.apache.commons.httpclient" level="WARN" additivity="false"/> + <logger name="org.apache.commons" level="WARN" additivity="false"/> + <logger name="org.apache.coyote" level="WARN" additivity="false"/> + <logger name="org.apache.jasper" level="WARN" additivity="false"/> + + <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging. + May aid in troubleshooting) --> + <logger name="org.apache.camel" level="WARN" additivity="false"/> + <logger name="org.apache.cxf" level="WARN" additivity="false"/> + <logger name="org.apache.camel.processor.interceptor" level="WARN" additivity="false"/> + <logger name="org.apache.cxf.jaxrs.interceptor" level="WARN" additivity="false"/> + <logger name="org.apache.cxf.service" level="WARN" additivity="false"/> + <logger name="org.restlet" level="DEBUG" additivity="false"/> + <logger name="org.apache.camel.component.restlet" level="DEBUG" additivity="false"/> + <logger name="org.apache.kafka" level="DEBUG" additivity="false"/> + <logger name="org.apache.zookeeper" level="INFO" additivity="false"/> + <logger name="org.I0Itec.zkclient" level="DEBUG" additivity="false"/> + + <!-- logback internals logging --> + <logger name="ch.qos.logback.classic" level="INFO" additivity="false"/> + <logger name="ch.qos.logback.core" level="INFO" additivity="false"/> + + <!-- logback jms appenders & loggers definition starts here --> + <!-- logback jms appenders & loggers definition starts here --> + <appender name="auditLogs" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + </filter> + <encoder> + <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern> + </encoder> + </appender> + <appender name="perfLogs" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + </filter> + <encoder> + <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern> + </encoder> + </appender> + <appender name="ASYNC-audit" class="ch.qos.logback.classic.AsyncAppender"> + <queueSize>1000</queueSize> + <discardingThreshold>0</discardingThreshold> + <appender-ref ref="Audit-Record-Queue"/> + </appender> + + <logger name="AuditRecord" level="INFO" additivity="FALSE"> + <appender-ref ref="STDOUT"/> + </logger> + <logger name="AuditRecord_DirectCall" level="INFO" additivity="FALSE"> + <appender-ref ref="STDOUT"/> + </logger> + <appender name="ASYNC-perf" class="ch.qos.logback.classic.AsyncAppender"> + <queueSize>1000</queueSize> + <discardingThreshold>0</discardingThreshold> + <appender-ref ref="Performance-Tracker-Queue"/> + </appender> + <logger name="PerfTrackerRecord" level="INFO" additivity="FALSE"> + <appender-ref ref="ASYNC-perf"/> + <appender-ref ref="perfLogs"/> + </logger> + <!-- logback jms appenders & loggers definition ends here --> + + <root level="DEBUG"> + <appender-ref ref="DEBUG"/> + <appender-ref ref="ERROR"/> + <appender-ref ref="INFO"/> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> diff --git a/src/test/resources/dmaap-msg-router/message-router-compose.yml b/src/test/resources/dmaap-msg-router/message-router-compose.yml new file mode 100644 index 00000000..e110a96f --- /dev/null +++ b/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -0,0 +1,82 @@ +version: '2' +services: + zookeeper: + image: nexus3.onap.org:10001/onap/dmaap/zookeeper:6.0.3 + ports: + - "2181:2181" + environment: + ZOOKEEPER_REPLICAS: 1 + ZOOKEEPER_TICK_TIME: 2000 + ZOOKEEPER_SYNC_LIMIT: 5 + ZOOKEEPER_INIT_LIMIT: 10 + ZOOKEEPER_MAX_CLIENT_CNXNS: 200 + ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3 + ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24 + ZOOKEEPER_CLIENT_PORT: 2181 + KAFKA_OPTS: -Djava.security.auth.login.config=/etc/zookeeper/secrets/jaas/zk_server_jaas.conf -Dzookeeper.kerberos.removeHostFromPrincipal=true -Dzookeeper.kerberos.removeRealmFromPrincipal=true -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl + ZOOKEEPER_SERVER_ID: 1 + volumes: + - ./zk_server_jaas.conf:/etc/zookeeper/secrets/jaas/zk_server_jaas.conf + networks: + net: + aliases: + - zookeeper + + kafka: + image: nexus3.onap.org:10001/onap/dmaap/kafka111:1.0.5 + ports: + - "9092:9092" + environment: + enableCadi: 'false' + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000 + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT + KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false' + KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas/zk_client_jaas.conf + KAFKA_ZOOKEEPER_SET_ACL: 'true' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + volumes: + - ./zk_client_jaas.conf:/etc/kafka/secrets/jaas/zk_client_jaas.conf + networks: + net: + aliases: + - kafka + depends_on: + - zookeeper + + onap-dmaap: + image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.20 + ports: + - "3904:3904" + - "3905:3905" + environment: + enableCadi: 'false' + volumes: + - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties + - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml + - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties + networks: + net: + aliases: + - onap-dmaap + depends_on: + - zookeeper + - kafka + + mockserver: + image: mockserver/mockserver:mockserver-5.11.2 + command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost onap-dmaap + ports: + - "1080:1090" + networks: + - net + depends_on: + - onap-dmaap + +networks: + net: + driver: bridge diff --git a/src/test/resources/dmaap-msg-router/zk_client_jaas.conf b/src/test/resources/dmaap-msg-router/zk_client_jaas.conf new file mode 100644 index 00000000..d4ef1eb0 --- /dev/null +++ b/src/test/resources/dmaap-msg-router/zk_client_jaas.conf @@ -0,0 +1,5 @@ +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="kafka" + password="kafka_secret"; + };
\ No newline at end of file diff --git a/src/test/resources/dmaap-msg-router/zk_server_jaas.conf b/src/test/resources/dmaap-msg-router/zk_server_jaas.conf new file mode 100644 index 00000000..26bf4601 --- /dev/null +++ b/src/test/resources/dmaap-msg-router/zk_server_jaas.conf @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_kafka=kafka_secret; +};
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json deleted file mode 100644 index ca59c7e7..00000000 --- a/src/test/resources/testParseDMaaPCredentialsLegacy.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "channels": [ - { - "name": "auth-credentials-null", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - "basicAuthPassword": null, - "basicAuthUsername": null, - }, - { - "name": "auth-credentials-present", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - "basicAuthPassword": "samplePassword", - "basicAuthUsername": "sampleUser", - }, - { - "name": "auth-credentials-missing", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - } - ] -}
\ No newline at end of file diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json deleted file mode 100644 index 9661e30c..00000000 --- a/src/test/resources/testParseDMaaPLegacy.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "channels": [ - { - "name": "url-precedes-hosts", - "cambria.url": "127.0.0.1:3904", - "cambria.hosts": "h1.att.com,h2.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - }, - { - "name": "url-key-missing", - "cambria.hosts": "h1.att.com,h2.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV", - }, - { - "name": "url-is-null", - "cambria.url": null, - "cambria.hosts": "h1.att.com,h2.att.com", - "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV" - } - ] -}
\ No newline at end of file diff --git a/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json new file mode 100644 index 00000000..0a9604d4 --- /dev/null +++ b/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json @@ -0,0 +1,108 @@ + +{ + "eventList": [ + { + "commonEventHeader": { + "version": "4.1", + "vesEventListenerVersion": "7.2", + "domain": "stndDefined", + "eventId": "stndDefined-gNB_Nokia000001", + "eventName": "stndDefined-gNB-Nokia-PowerLost", + "stndDefinedNamespace": "3GPP-FaultSupervision", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "reportingEntityName": "ibcx0001vm002oam001", + "sourceName": "scfx0001vm002cap001", + "sequence": 1, + "priority": "High" + }, + "stndDefinedFields": { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": 1, + "uri": "1", + "notificationId": 1, + "notificationType": "notifyNewAlarm", + "eventTime": "xyz", + "systemDN": "xyz", + "probableCause": 1, + "perceivedSeverity": "INDETERMINATE", + "rootCauseIndicator": false, + "specificProblem": "xyz", + "correlatedNotifications": [], + "backedUpStatus": true, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": { + "observedMeasurement": "new", + "observedValue": 123 + }, + "stateChangeDefinition": { + }, + "monitoredAttributes": { + "newAtt": "new" + }, + "proposedRepairActions": "xyz", + "additionalText": "xyz", + "additionalInformation": { + "addInfo": "new" + }, + "alarmId": "1", + "alarmType": "COMMUNICATIONS_ALARM" + }, + "stndDefinedFieldsVersion": "1.0" + }}, + { + "commonEventHeader": { + "version": "4.1", + "vesEventListenerVersion": "7.2", + "domain": "stndDefined", + "eventId": "stndDefined-gNB_Nokia000001", + "eventName": "stndDefined-gNB-Nokia-PowerLost", + "stndDefinedNamespace": "3GPP-FaultSupervision2", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "reportingEntityName": "ibcx0001vm002oam001", + "sourceName": "scfx0001vm002cap001", + "sequence": 1, + "priority": "High" + }, + "stndDefinedFields": { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": 1, + "uri": "1", + "notificationId": 1, + "notificationType": "notifyNewAlarm", + "eventTime": "xyz", + "systemDN": "xyz", + "probableCause": 1, + "perceivedSeverity": "INDETERMINATE", + "rootCauseIndicator": false, + "specificProblem": "xyz", + "correlatedNotifications": [], + "backedUpStatus": true, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": { + "observedMeasurement": "new", + "observedValue": 123 + }, + "stateChangeDefinition": { + }, + "monitoredAttributes": { + "newAtt": "new" + }, + "proposedRepairActions": "xyz", + "additionalText": "xyz", + "additionalInformation": { + "addInfo": "new" + }, + "alarmId": "1", + "alarmType": "COMMUNICATIONS_ALARM" + }, + "stndDefinedFieldsVersion": "1.0" + }} + ] +} + diff --git a/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json new file mode 100644 index 00000000..7e095d56 --- /dev/null +++ b/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json @@ -0,0 +1,108 @@ + +{ + "eventList": [ + { + "commonEventHeader": { + "version": "4.1", + "vesEventListenerVersion": "7.2", + "domain": "stndDefined", + "eventId": "stndDefined-gNB_Nokia000001", + "eventName": "stndDefined-gNB-Nokia-PowerLost", + "stndDefinedNamespace": "3GPP-FaultSupervision", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "reportingEntityName": "ibcx0001vm002oam001", + "sourceName": "scfx0001vm002cap001", + "sequence": 1, + "priority": "High" + }, + "stndDefinedFields": { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": 1, + "uri": "1", + "notificationId": 1, + "notificationType": "notifyNewAlarm", + "eventTime": "xyz", + "systemDN": "xyz", + "probableCause": 1, + "perceivedSeverity": "INDETERMINATE", + "rootCauseIndicator": false, + "specificProblem": "xyz", + "correlatedNotifications": [], + "backedUpStatus": true, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": { + "observedMeasurement": "new", + "observedValue": 123 + }, + "stateChangeDefinition": { + }, + "monitoredAttributes": { + "newAtt": "new" + }, + "proposedRepairActions": "xyz", + "additionalText": "xyz", + "additionalInformation": { + "addInfo": "new" + }, + "alarmId": "1", + "alarmType": "COMMUNICATIONS_ALARM" + }, + "stndDefinedFieldsVersion": "1.0" + }}, + { + "commonEventHeader": { + "version": "4.1", + "vesEventListenerVersion": "7.2", + "domain": "stndDefined", + "eventId": "stndDefined-gNB_Nokia000001", + "eventName": "stndDefined-gNB-Nokia-PowerLost", + "stndDefinedNamespace": "3GPP-FaultSupervision", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "reportingEntityName": "ibcx0001vm002oam001", + "sourceName": "scfx0001vm002cap001", + "sequence": 1, + "priority": "High" + }, + "stndDefinedFields": { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": 1, + "uri": "1", + "notificationId": 1, + "notificationType": "notifyNewAlarm", + "eventTime": "xyz", + "systemDN": "xyz", + "probableCause": 1, + "perceivedSeverity": "INDETERMINATE", + "rootCauseIndicator": false, + "specificProblem": "xyz", + "correlatedNotifications": [], + "backedUpStatus": true, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": { + "observedMeasurement": "new", + "observedValue": 123 + }, + "stateChangeDefinition": { + }, + "monitoredAttributes": { + "newAtt": "new" + }, + "proposedRepairActions": "xyz", + "additionalText": "xyz", + "additionalInformation": { + "addInfo": "new" + }, + "alarmId": "1", + "alarmType": "COMMUNICATIONS_ALARM" + }, + "stndDefinedFieldsVersion": "1.0" + }} + ] +} + diff --git a/src/test/resources/ves7_batch_valid_two_different_domain.json b/src/test/resources/ves7_batch_valid_two_different_domain.json new file mode 100644 index 00000000..648c81ca --- /dev/null +++ b/src/test/resources/ves7_batch_valid_two_different_domain.json @@ -0,0 +1,90 @@ + +{ + "eventList": [ + { + "commonEventHeader": { + "version": "4.0.1", + "vesEventListenerVersion": "7.0.1", + "domain": "fault", + "eventName": "Fault_Vscf:Acs-Ericcson_PilotNumberPoolExhaustion", + "eventId": "fault0000250", + "sequence": 1, + "priority": "High", + "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234", + "reportingEntityName": "ibcx0001vm002oam0011234", + "sourceId": "de305d54-75b4-431b-adb2-eb6b9e546014", + "sourceName": "scfx0001vm002cap001", + "nfVendorName": "Ericsson", + "nfNamingCode": "scfx", + "nfcNamingCode": "ssc", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "timeZoneOffset": "UTC-05:30" + }, + "faultFields": { + "faultFieldsVersion": "4.0", + "alarmCondition": "PilotNumberPoolExhaustion", + "eventSourceType": "other", + "specificProblem": "Calls cannot complete - pilot numbers are unavailable", + "eventSeverity": "CRITICAL", + "vfStatus": "Active", + "alarmAdditionalInformation": { + "PilotNumberPoolSize": "1000" + } + } + }, + { + "commonEventHeader": { + "version": "4.1", + "vesEventListenerVersion": "7.2", + "domain": "stndDefined", + "eventId": "stndDefined-gNB_Nokia000001", + "eventName": "stndDefined-gNB-Nokia-PowerLost", + "stndDefinedNamespace": "3GPP-FaultSupervision", + "startEpochMicrosec": 1413378172000000, + "lastEpochMicrosec": 1413378172000000, + "reportingEntityName": "ibcx0001vm002oam001", + "sourceName": "scfx0001vm002cap001", + "sequence": 1, + "priority": "High" + }, + "stndDefinedFields": { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": 1, + "uri": "1", + "notificationId": 1, + "notificationType": "notifyNewAlarm", + "eventTime": "xyz", + "systemDN": "xyz", + "probableCause": 1, + "perceivedSeverity": "INDETERMINATE", + "rootCauseIndicator": false, + "specificProblem": "xyz", + "correlatedNotifications": [], + "backedUpStatus": true, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": { + "observedMeasurement": "new", + "observedValue": 123 + }, + "stateChangeDefinition": { + }, + "monitoredAttributes": { + "newAtt": "new" + }, + "proposedRepairActions": "xyz", + "additionalText": "xyz", + "additionalInformation": { + "addInfo": "new" + }, + "alarmId": "1", + "alarmType": "COMMUNICATIONS_ALARM" + }, + "stndDefinedFieldsVersion": "1.0" + } + } + ] +} + |