From 74b598291ed2461e0e482f556baf2943a97a54f2 Mon Sep 17 00:00:00 2001 From: Maciej Malewski Date: Tue, 8 Jun 2021 09:04:48 +0200 Subject: Replace cambria with DmaaP client - remove cambria, add DmaaP client - sending event for many topics at once is no longer supported - add backward compatibility status codes - add additional validation for batchEvent Issue-ID: DCAEGEN2-1483 Signed-off-by: Maciej Malewski Change-Id: I945c38b4ab04b697ecfabd5ce38502f83fa70d1a --- .../java/org/onap/dcae/common/EventSender.java | 56 ++++++++-------------- 1 file changed, 21 insertions(+), 35 deletions(-) (limited to 'src/main/java/org/onap/dcae/common/EventSender.java') diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 81c463dc..400597ff 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,52 +20,38 @@ */ package org.onap.dcae.common; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; + import io.vavr.collection.Map; import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.publishing.DMaaPEventPublisher; +import org.onap.dcae.restapi.EventValidatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import java.util.List; -public class EventSender { +import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID; - private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); - private Map streamIdToDmaapIds; - private DMaaPEventPublisher eventPublisher; - private static final Logger log = LoggerFactory.getLogger(EventSender.class); - public EventSender(DMaaPEventPublisher eventPublisher, Map streamIdToDmaapIds) { - this.eventPublisher = eventPublisher; - this.streamIdToDmaapIds = streamIdToDmaapIds; - } +public class EventSender { - public void send(List vesEvents) { - for (VesEvent vesEvent : vesEvents) { - metriclog.info("EVENT_PUBLISH_START"); - setLoggingContext(vesEvent); - streamIdToDmaapIds.get(vesEvent.getStreamId()) - .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); - log.debug("Message published" + vesEvent.asJsonObject()); - } - log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); - metriclog.info("EVENT_PUBLISH_END"); - } + private Map streamIdToDmaapIds; + private DMaaPEventPublisher eventPublisher; + private static final Logger log = LoggerFactory.getLogger(EventSender.class); - private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { - for (String dmaapId : dmaapIds) { - log.info("Invoking publisher for streamId/domain:" + dmaapId); - eventPublisher.sendEvent(vesEvent, dmaapId); + public EventSender(DMaaPEventPublisher eventPublisher, Map streamIdToDmaapIds) { + this.eventPublisher = eventPublisher; + this.streamIdToDmaapIds = streamIdToDmaapIds; } - } - private void setLoggingContext(VesEvent vesEvent) { - LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString()); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain()); - } + public HttpStatus send(List vesEvents) { + String topic = streamIdToDmaapIds + .get(vesEvents.get(0).getStreamId()) + .getOrElse(() -> { + log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject()); + throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID); + }); + return eventPublisher.sendEvent(vesEvents, topic); + } } -- cgit 1.2.3-korg