path: root/src/main/java/org/onap/aai/spike/service
diff options
Diffstat (limited to 'src/main/java/org/onap/aai/spike/service')
3 files changed, 458 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/spike/service/EchoService.java b/src/main/java/org/onap/aai/spike/service/EchoService.java
new file mode 100644
index 0000000..6c5b5ee
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/EchoService.java
@@ -0,0 +1,85 @@
+ * ============LICENSE_START=======================================================
+ * Spike
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+package org.onap.aai.spike.service;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response.Status;
+import org.onap.aai.cl.api.LogFields;
+import org.onap.aai.cl.api.LogLine;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+@RequestMapping(value = "/services/spike/v1/echo-service")
+public class EchoService {
+ private static Logger logger = LoggerFactory.getInstance().getLogger(EchoService.class.getName());
+ private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EchoService.class.getName());
+ @GetMapping("/echo")
+ public ResponseEntity<String> ping(@RequestHeader HttpHeaders headers, HttpServletRequest req) {
+ String fromIp = req.getRemoteAddr();
+ String fromAppId = "";
+ String transId = null;
+ if (headers.getFirst("X-FromAppId") != null) {
+ fromAppId = headers.getFirst("X-FromAppId");
+ }
+ if ((headers.getFirst("X-TransactionId") == null) || headers.getFirst("X-TransactionId").isEmpty()) {
+ transId = java.util.UUID.randomUUID().toString();
+ } else {
+ transId = headers.getFirst("X-TransactionId");
+ }
+ MdcContext.initialize(transId, SpikeConstants.SPIKE_SERVICE_NAME, "", fromAppId, fromIp);
+ // Generate error log
+ logger.info(SpikeMsgs.PROCESS_REST_REQUEST, req.getMethod(), req.getRequestURL().toString(),
+ req.getRemoteHost(), Status.OK.toString());
+ // Generate audit log.
+ auditLogger.info(SpikeMsgs.PROCESS_REST_REQUEST,
+ new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, Status.OK.toString())
+ .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, Status.OK.toString()),
+ (req != null) ? req.getMethod() : "Unknown", (req != null) ? req.getRequestURL().toString() : "Unknown",
+ (req != null) ? req.getRemoteHost() : "Unknown", Status.OK.toString());
+ MDC.clear();
+ return new ResponseEntity<>("Alive", HttpStatus.OK);
+ }
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
new file mode 100644
index 0000000..c4e1746
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
@@ -0,0 +1,298 @@
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.service;
+import java.util.ArrayList;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.PriorityBlockingQueue;
+import javax.naming.OperationNotSupportedException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+import org.onap.aai.event.api.MessageWithOffset;
+import org.onap.aai.spike.event.envelope.EventEnvelope;
+import org.onap.aai.spike.event.envelope.EventEnvelopeParser;
+import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
+import org.onap.aai.spike.event.incoming.OffsetManager;
+import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
+import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.onap.aai.spike.util.SpikeProperties;
+public class SpikeEventProcessor extends TimerTask {
+ /**
+ * Client used for consuming events to the event bus.
+ */
+ private EventConsumer consumer;
+ /**
+ * Client used for publishing events to the event bus.
+ */
+ private EventPublisher publisher;
+ /**
+ * Internal queue where outgoing events will be buffered until they can be serviced by the event
+ * publisher worker threads.
+ */
+ private BlockingQueue<SpikeGraphEvent> eventQueue;
+ private Integer eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY;
+ private Integer eventOffsetPeriod = DEFAULT_EVENT_OFFSET_COMMIT_PERIOD;
+ private OffsetManager offsetManager;
+ private Long lastCommittedOffset = null;
+ private EventEnvelopeParser eventEnvelopeParser;
+ /**
+ * Number of events that can be queued up for publishing before it is dropped
+ */
+ private static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+ private static final Integer DEFAULT_EVENT_OFFSET_COMMIT_PERIOD = 10000;
+ private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeEventProcessor.class.getName());
+ private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(SpikeEventProcessor.class.getName());
+ private static final Gson gson =
+ new GsonBuilder().setExclusionStrategies(new SpikeEventExclusionStrategy()).setPrettyPrinting().create();
+ public SpikeEventProcessor(EventConsumer consumer, EventPublisher publisher) {
+ this.consumer = consumer;
+ this.publisher = publisher;
+ try {
+ eventQueueCapacity = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_CAPACITY));
+ eventOffsetPeriod = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_OFFSET_CHECK_PERIOD));
+ } catch (Exception ex) {
+ }
+ eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator());
+ new Thread(new SpikeEventPublisher()).start();
+ // Instantiate the offset manager. This will run a background thread that
+ // periodically updates the value of the most recent offset value that can
+ // be safely committed with the event bus.
+ offsetManager = new OffsetManager(eventQueueCapacity, eventOffsetPeriod);
+ eventEnvelopeParser = new EventEnvelopeParser();
+ }
+ @Override
+ public void run() {
+ logger.info(SpikeMsgs.SPIKE_QUERY_EVENT_SYSTEM);
+ if (consumer == null) {
+ }
+ Iterable<MessageWithOffset> events = null;
+ try {
+ events = consumer.consumeWithOffsets();
+ } catch (OperationNotSupportedException e) {
+ // This means we are using DMaaP and can't use offsets
+ try {
+ Iterable<String> tempEvents = consumer.consume();
+ ArrayList<MessageWithOffset> messages = new ArrayList<MessageWithOffset>();
+ for (String event : tempEvents) {
+ messages.add(new MessageWithOffset(0, event));
+ }
+ events = messages;
+ } catch (Exception e1) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e1.getMessage());
+ return;
+ }
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
+ return;
+ }
+ if (events == null || !events.iterator().hasNext()) {
+ logger.info(SpikeMsgs.SPIKE_NO_EVENT_RECEIVED);
+ }
+ for (MessageWithOffset event : events) {
+ try {
+ logger.debug(SpikeMsgs.SPIKE_EVENT_RECEIVED, event.getMessage());
+ GizmoGraphEvent modelEvent = eventEnvelopeParser.parseEvent(event.getMessage());
+ auditLogger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED,
+ "of type: " + modelEvent.getObjectType() + " with key: " + modelEvent.getObjectKey()
+ + " , transaction-id: " + modelEvent.getTransactionId());
+ logger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, "of type: " + modelEvent.getObjectType() + " with key: "
+ + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
+ String modelEventJson = gson.toJson(modelEvent);
+ // Log the current event as 'being processed' with the offset manager so that we know that it's
+ // associated offset is not yet save to be committed as 'done'.
+ offsetManager.cacheEvent(modelEvent.getTransactionId(), event.getOffset());
+ while (eventQueue.size() >= eventQueueCapacity) {
+ // Wait until there's room in the queue
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE,
+ "Event could not be published to the event bus due to: Internal buffer capacity exceeded. Waiting 10 seconds.");
+ Thread.sleep(10000);
+ }
+ eventQueue.offer(modelEvent.toSpikeGraphEvent());
+ logger.info(SpikeMsgs.SPIKE_EVENT_PROCESSED, "of type: " + modelEvent.getObjectType() + " with key: "
+ + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
+ } catch (SpikeException | InterruptedException e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
+ e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
+ e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ }
+ }
+ try {
+ // Get the next 'safe' offset to be committed from the offset manager.
+ // We need to do this here istead of letting the offset manager just take care
+ // of it for us because the event consumer is not thread safe. If we try to
+ // commit the offsets from another thread, it gets unhappy...
+ Long nextOffset = offsetManager.getNextOffsetToCommit();
+ // Make sure we actually have a real value...
+ if (nextOffset != null) {
+ // There is no point in continually committing the same offset value, so make sure
+ // that something has actually changed before we do anything...
+ if ((lastCommittedOffset == null) || (lastCommittedOffset != nextOffset)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Committing offset: " + nextOffset + " to the event bus for Champ raw event topic.");
+ }
+ // OK, let's commit the latest value...
+ consumer.commitOffsets(nextOffset);
+ lastCommittedOffset = nextOffset;
+ }
+ }
+ } catch (OperationNotSupportedException e) {
+ // We must be working with a DMaap which doesn't support offset management. Swallow
+ // the exception
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
+ }
+ }
+ /**
+ * This class implements the threads which is responsible for buffering the events in memory and
+ * ordering them before publishing it to topic
+ * <p>
+ * Each publish operation is performed synchronously, so that the thread will only move on to the
+ * next available event once it has actually published the current event to the bus.
+ */
+ private class SpikeEventPublisher implements Runnable {
+ /**
+ * Partition key to use when publishing events to the event stream. We WANT all events to go to a
+ * single partition, so we are just using a hard-coded key for every event.
+ */
+ private static final String EVENTS_PARTITION_KEY = "SpikeEventKey";
+ private static final int DEFAULT_EVENT_QUEUE_DELAY = 10000;
+ Integer eventQueueDelay = DEFAULT_EVENT_QUEUE_DELAY;
+ public SpikeEventPublisher() {
+ try {
+ eventQueueDelay = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_DELAY));
+ } catch (Exception ex) {
+ }
+ }
+ @Override
+ public void run() {
+ while (true) {
+ SpikeGraphEvent nextEvent;
+ SpikeGraphEvent event = null;
+ try {
+ // Get the next event to be published from the queue if it is old enough or we have too
+ // many items in the queue
+ if ((nextEvent = eventQueue.peek()) != null
+ && (System.currentTimeMillis() - nextEvent.getSpikeTimestamp() > eventQueueDelay
+ || eventQueue.size() > eventQueueCapacity)) {
+ event = eventQueue.take();
+ } else {
+ continue;
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
+ // Try publishing the event to the event bus. This call will block
+ // until the event is published or times out.
+ try {
+ String eventJson = gson.toJson(new EventEnvelope(event));
+ int sentMessageCount = publisher.sendSync(EVENTS_PARTITION_KEY, eventJson);
+ if (sentMessageCount > 0) {
+ logger.info(SpikeMsgs.SPIKE_EVENT_PUBLISHED, "of type: " + event.getObjectType() + " with key: "
+ + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_EVENT_PUBLISHED, eventJson);
+ } else {
+ logger.warn(SpikeMsgs.SPIKE_PUBLISH_FAILED, "of type: " + event.getObjectType() + " with key: "
+ + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_PUBLISH_FAILED, eventJson);
+ }
+ // Inform the offset manager that this event has been published. It's offset
+ // can now, potentially, be safely committed to the event bus so that on a
+ // restart we won't reprocess it.
+ offsetManager.markAsPublished(event.getTransactionId());
+ } catch (ExecutionException e) {
+ // Publish timed out, queue it up to retry again. Since this message was pulled from the
+ // top of the queue, it will go back to the top.
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, "Retrying in 60 seconds. " + e.getMessage());
+ eventQueue.offer(event);
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());
+ }
+ }
+ }
+ }
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeService.java b/src/main/java/org/onap/aai/spike/service/SpikeService.java
new file mode 100644
index 0000000..3aa6dfe
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/SpikeService.java
@@ -0,0 +1,75 @@
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.service;
+import java.util.Timer;
+import javax.annotation.PreDestroy;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.schema.EdgeRulesLoader;
+import org.onap.aai.spike.schema.OXMModelLoader;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.onap.aai.spike.util.SpikeProperties;
+public class SpikeService {
+ private EventConsumer consumer;
+ private EventPublisher publisher;
+ private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeService.class.getName());
+ private Timer timer;
+ public SpikeService(EventConsumer consumer, EventPublisher publisher) {
+ this.consumer = consumer;
+ this.publisher = publisher;
+ }
+ public void startup() throws Exception {
+ // Load models
+ EdgeRulesLoader.loadModels();
+ OXMModelLoader.loadModels();
+ Long interval = 30000L;
+ try {
+ interval = Long.parseLong(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_POLL_INTERVAL));
+ } catch (Exception ex) {
+ }
+ SpikeEventProcessor processEvent = new SpikeEventProcessor(consumer, publisher);
+ timer = new Timer("spike-consumer");
+ timer.schedule(processEvent, interval, interval);
+ logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName());
+ }
+ @PreDestroy
+ protected void preShutdown() {
+ logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName());
+ timer.cancel();
+ }