From f6260a26de44a9338ca998626a93c0d0fa56abc3 Mon Sep 17 00:00:00 2001 From: Bogumil Zebek Date: Wed, 22 Jul 2020 08:19:51 +0200 Subject: StndDefined event routing Route stndDefined events to streams defined in namespace event field. Change-Id: I3963e220095665f8ca3fd1b21c5c20b44057cf76 Issue-ID: DCAEGEN2-1771 Signed-off-by: Zebek Bogumil --- .../java/org/onap/dcae/common/EventSender.java | 52 +++++++++------------- 1 file changed, 22 insertions(+), 30 deletions(-) (limited to 'src/main/java/org/onap/dcae/common/EventSender.java') diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index c1002af6..63be9106 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -1,9 +1,9 @@ /* * ============LICENSE_START======================================================= - * PROJECT + * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved.s + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,56 +24,48 @@ import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import org.json.JSONArray; -import org.json.JSONObject; -import org.onap.dcae.ApplicationSettings; +import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class EventSender { private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - private Map streamidHash; + private Map streamIdToDmaapIds; private EventPublisher eventPublisher; - private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) { + public EventSender(EventPublisher eventPublisher, Map streamIdToDmaapIds) { this.eventPublisher = eventPublisher; - this.streamidHash = properties.dMaaPStreamsMapping(); + this.streamIdToDmaapIds = streamIdToDmaapIds; } - public void send(JSONArray arrayOfEvents) { - for (int i = 0; i < arrayOfEvents.length(); i++) { + public void send(List vesEvents) { + for (VesEvent vesEvent : vesEvents) { metriclog.info("EVENT_PUBLISH_START"); - JSONObject object = (JSONObject) arrayOfEvents.get(i); - setLoggingContext(object); - streamidHash.get(getDomain(object)) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + object)) - .forEach(streamIds -> sendEventsToStreams(object, streamIds)); - log.debug("Message published" + object); + setLoggingContext(vesEvent); + streamIdToDmaapIds.get(vesEvent.getStreamId()) + .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) + .forEach(streamIds -> sendEventsToStreams(vesEvent, streamIds)); + log.debug("Message published" + vesEvent.asJsonObject()); } log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); metriclog.info("EVENT_PUBLISH_END"); } - private static String getDomain(JSONObject event) { - return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); - } - - private void sendEventsToStreams(JSONObject event, String[] streamIdList) { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - eventPublisher.sendEvent(event, aStreamIdList); + private void sendEventsToStreams(VesEvent vesEvent, String[] streamIdList) { + for (String streamId : streamIdList) { + log.info("Invoking publisher for streamId/domain:" + streamId); + eventPublisher.sendEvent(vesEvent.asJsonObject(), streamId); } } - private void setLoggingContext(JSONObject event) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(event.get(VES_UNIQUE_ID).toString()); + private void setLoggingContext(VesEvent vesEvent) { + LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString()); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("event.VESuniqueId" + event.get(VES_UNIQUE_ID) + "event.commonEventHeader.domain:" + getDomain(event)); + log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain()); } } -- cgit 1.2.3-korg