diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common/EventSender.java')
-rw-r--r-- | src/main/java/org/onap/dcae/common/EventSender.java | 56 |
1 files changed, 21 insertions, 35 deletions
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 81c463dc..400597ff 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,52 +20,38 @@ */ package org.onap.dcae.common; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; + import io.vavr.collection.Map; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.restapi.EventValidatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import java.util.List; -public class EventSender { +import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID; - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - private Map<String, String[]> streamIdToDmaapIds; - private DMaaPEventPublisher eventPublisher; - private static final Logger log = LoggerFactory.getLogger(EventSender.class); - public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) { - this.eventPublisher = eventPublisher; - this.streamIdToDmaapIds = streamIdToDmaapIds; - } +public class EventSender { - public void send(List<VesEvent> vesEvents) { - for (VesEvent vesEvent : vesEvents) { - metriclog.info("EVENT_PUBLISH_START"); - setLoggingContext(vesEvent); - streamIdToDmaapIds.get(vesEvent.getStreamId()) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); - log.debug("Message published" + vesEvent.asJsonObject()); - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metriclog.info("EVENT_PUBLISH_END"); - } + private Map<String, String> streamIdToDmaapIds; + private DMaaPEventPublisher eventPublisher; + private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { - for (String dmaapId : dmaapIds) { - log.info("Invoking publisher for streamId/domain:" + dmaapId); - eventPublisher.sendEvent(vesEvent, dmaapId); + public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String> streamIdToDmaapIds) { + this.eventPublisher = eventPublisher; + this.streamIdToDmaapIds = streamIdToDmaapIds; } - } - private void setLoggingContext(VesEvent vesEvent) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain()); - } + public HttpStatus send(List<VesEvent> vesEvents) { + String topic = streamIdToDmaapIds + .get(vesEvents.get(0).getStreamId()) + .getOrElse(() -> { + log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject()); + throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID); + }); + return eventPublisher.sendEvent(vesEvents, topic); + } } |