diff options
9 files changed, 232 insertions, 310 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/pom.xml index 0f1507111..b71b68924 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/pom.xml +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/pom.xml @@ -1,7 +1,7 @@ <!-- ============LICENSE_START======================================================= Copyright (C) 2018 Ericsson. All rights reserved. - Modifications Copyright (C) 2019 Nordix Foundation. + Modifications Copyright (C) 2019-2020 Nordix Foundation. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,7 +18,10 @@ SPDX-License-Identifier: Apache-2.0 ============LICENSE_END========================================================= --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.onap.policy.apex-pdp.plugins.plugins-event.plugins-event-carrier</groupId> @@ -47,6 +50,16 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index 57560d2ef..80f8fa66e 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -34,9 +34,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; @@ -103,39 +103,36 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the REST Properties if (!(consumerParameters - .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { + .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { final String errorMessage = - "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")"; - LOGGER.warn(errorMessage); + "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")"; throw new ApexEventException(errorMessage); } restConsumerProperties = - (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Check if we are in peered mode if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { final String errorMessage = "REST Requestor consumer (" + this.name - + ") must run in peered requestor mode with a REST Requestor producer"; - LOGGER.warn(errorMessage); + + ") must run in peered requestor mode with a REST Requestor producer"; throw new ApexEventException(errorMessage); } // Check if the HTTP method has been set if (restConsumerProperties.getHttpMethod() == null) { restConsumerProperties - .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); + .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); } // Check if the HTTP URL has been set if (restConsumerProperties.getUrl() == null) { final String errorMessage = "no URL has been specified on REST Requestor consumer (" + this.name + ")"; - LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } @@ -144,7 +141,6 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { new URL(restConsumerProperties.getUrl()); } catch (final Exception e) { final String errorMessage = "invalid URL has been specified on REST Requestor consumer (" + this.name + ")"; - LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage, e); } @@ -157,8 +153,8 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { // Check if HTTP headers has been set if (restConsumerProperties.checkHttpHeadersSet()) { - LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name, - Arrays.deepToString(restConsumerProperties.getHttpHeaders())); + final String httpHeaderString = Arrays.deepToString(restConsumerProperties.getHttpHeaders()); + LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name, httpHeaderString); } // Initialize the HTTP client @@ -177,8 +173,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { incomingRestRequestQueue.add(restRequest); } catch (final Exception requestException) { final String errorMessage = - "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")"; - LOGGER.warn(errorMessage, requestException); + "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")"; throw new ApexEventRuntimeException(errorMessage); } } @@ -202,7 +197,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { try { // Take the next event from the queue final ApexRestRequest restRequest = - incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); + incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); if (restRequest == null) { // Poll timed out, check for request timeouts timeoutExpiredRequests(); @@ -215,11 +210,11 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { Set<String> names = restConsumerProperties.getKeysFromUrl(); Set<String> inputProperty = inputExecutionProperties.stringPropertyNames(); - names.stream().map(Optional::of).forEach(op -> - op.filter(inputProperty::contains) - .orElseThrow(() -> new ApexEventRuntimeException( - "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl() - + "\"not found in execution properties passed by the current policy"))); + names.stream().map(Optional::of) + .forEach(op -> op.filter(inputProperty::contains) + .orElseThrow(() -> new ApexEventRuntimeException( + "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl() + + "\"not found in execution properties passed by the current policy"))); untaggedUrl = names.stream().reduce(untaggedUrl, (acc, str) -> acc.replace("{" + str + "}", (String) inputExecutionProperties.get(str))); @@ -264,7 +259,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { // Interrupt timed out requests and remove them from the ongoing map for (final ApexRestRequest timedoutRequest : timedoutRequestList) { final String errorMessage = - "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest; + "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest; LOGGER.warn(errorMessage); ongoingRestRequestMap.remove(timedoutRequest); @@ -324,8 +319,8 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { // Check that the request worked if (!isPass.matches()) { final String errorMessage = "reception of event from URL \"" + restConsumerProperties.getUrl() - + "\" failed with status code " + response.getStatus() + " and message \"" - + response.readEntity(String.class) + "\""; + + "\" failed with status code " + response.getStatus() + " and message \"" + + response.readEntity(String.class) + "\""; throw new ApexEventRuntimeException(errorMessage); } @@ -335,7 +330,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { // Check there is content if (StringUtils.isBlank(eventJsonString)) { final String errorMessage = - "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\""; + "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\""; throw new ApexEventRuntimeException(errorMessage); } @@ -372,7 +367,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { */ public Response sendEventAsRestRequest(String untaggedUrl) { Builder headers = client.target(untaggedUrl).request(APPLICATION_JSON) - .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()); + .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()); switch (restConsumerProperties.getHttpMethod()) { case GET: return headers.get(); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java index e166bdc1f..fe2f6c53b 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java @@ -30,8 +30,6 @@ import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Concrete implementation of an Apex event requestor that manages the producer side of a REST request. @@ -40,11 +38,6 @@ import org.slf4j.LoggerFactory; * */ public class ApexRestRequestorProducer extends ApexPluginsEventProducer { - private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorProducer.class); - - // The REST carrier properties - private RestRequestorCarrierTechnologyParameters restProducerProperties; - // The number of events sent private int eventsSent = 0; @@ -53,40 +46,36 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer { */ @Override public void init(final String producerName, final EventHandlerParameters producerParameters) - throws ApexEventException { + throws ApexEventException { this.name = producerName; // Check and get the REST Properties if (!(producerParameters - .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { + .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { final String errorMessage = - "specified producer properties are not applicable to REST requestor producer (" + this.name + ")"; - LOGGER.warn(errorMessage); + "specified producer properties are not applicable to REST requestor producer (" + this.name + ")"; throw new ApexEventException(errorMessage); } - restProducerProperties = - (RestRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + RestRequestorCarrierTechnologyParameters restProducerProperties = + (RestRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); // Check if we are in peered mode if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { final String errorMessage = "REST Requestor producer (" + this.name - + ") must run in peered requestor mode with a REST Requestor consumer"; - LOGGER.warn(errorMessage); + + ") must run in peered requestor mode with a REST Requestor consumer"; throw new ApexEventException(errorMessage); } // Check if the HTTP URL has been set if (restProducerProperties.getUrl() != null) { final String errorMessage = "URL may not be specified on REST Requestor producer (" + this.name + ")"; - LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } // Check if the HTTP method has been set if (restProducerProperties.getHttpMethod() != null) { final String errorMessage = - "HTTP method may not be specified on REST Requestor producer (" + this.name + ")"; - LOGGER.warn(errorMessage); + "HTTP method may not be specified on REST Requestor producer (" + this.name + ")"; throw new ApexEventException(errorMessage); } } @@ -105,7 +94,7 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer { */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, - final Object event) { + final Object event) { super.sendEvent(executionId, executionProperties, eventName, event); // Find the peered consumer for this producer @@ -114,23 +103,20 @@ public class ApexRestRequestorProducer extends ApexPluginsEventProducer { // Find the REST Response Consumer that will handle this request final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); if (!(consumer instanceof ApexRestRequestorConsumer)) { - final String errorMessage = "send of event to URL \"" + restProducerProperties.getUrl() + "\" failed," - + " REST response consumer is not an instance of ApexRestRequestorConsumer\n" + event; - LOGGER.warn(errorMessage); + final String errorMessage = "send of event failed," + + " REST response consumer is not an instance of ApexRestRequestorConsumer\n" + event; throw new ApexEventRuntimeException(errorMessage); } // Use the consumer to handle this event final ApexRestRequestorConsumer restRequstConsumer = (ApexRestRequestorConsumer) consumer; restRequstConsumer - .processRestRequest(new ApexRestRequest(executionId, executionProperties, eventName, event)); + .processRestRequest(new ApexRestRequest(executionId, executionProperties, eventName, event)); eventsSent++; } else { // No peered consumer defined - final String errorMessage = "send of event to URL \"" + restProducerProperties.getUrl() + "\" failed," - + " REST response consumer is not defined\n" + event; - LOGGER.warn(errorMessage); + final String errorMessage = "send of event failed," + " REST response consumer is not defined\n" + event; throw new ApexEventRuntimeException(errorMessage); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java index 0c6067a75..a1dc0b468 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java @@ -21,10 +21,10 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -54,41 +54,29 @@ public class ApexRestRequestorConsumerTest { EventHandlerParameters consumerParameters = new EventHandlerParameters(); ApexEventReceiver incomingEventReceiver = null; - try { + assertThatThrownBy(() -> { consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("specified consumer properties are not applicable to REST Requestor consumer (ConsumerName)", - aee.getMessage()); - } + }).hasMessage("specified consumer properties are not applicable to REST Requestor consumer (ConsumerName)"); RestRequestorCarrierTechnologyParameters rrctp = new RestRequestorCarrierTechnologyParameters(); consumerParameters.setCarrierTechnologyParameters(rrctp); - try { + assertThatThrownBy(() -> { consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("REST Requestor consumer (ConsumerName) must run in peered requestor mode " - + "with a REST Requestor producer", aee.getMessage()); - } + }).hasMessage("REST Requestor consumer (ConsumerName) must run in peered requestor mode " + + "with a REST Requestor producer"); consumerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true); rrctp.setHttpMethod(null); - try { + + assertThatThrownBy(() -> { consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("no URL has been specified on REST Requestor consumer (ConsumerName)", aee.getMessage()); - } + }).hasMessage("no URL has been specified on REST Requestor consumer (ConsumerName)"); rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET); rrctp.setUrl("ZZZZ"); - try { + assertThatThrownBy(() -> { consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("invalid URL has been specified on REST Requestor consumer (ConsumerName)", aee.getMessage()); - } + }).hasMessage("invalid URL has been specified on REST Requestor consumer (ConsumerName)"); rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET); rrctp.setUrl("http://www.onap.org"); @@ -97,12 +85,9 @@ public class ApexRestRequestorConsumerTest { consumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); - try { + assertThatThrownBy(() -> { consumer.processRestRequest(null); - fail("test should throw an exception here"); - } catch (Exception ex) { - assertEquals("could not queue request \"null\" on REST Requestor consumer (ConsumerName)", ex.getMessage()); - } + }).hasMessage("could not queue request \"null\" on REST Requestor consumer (ConsumerName)"); assertEquals(CONSUMER_NAME, consumer.getName()); assertEquals(0, consumer.getEventsReceived()); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducerTest.java index d168f2444..450a21f01 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducerTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,7 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -47,41 +49,30 @@ public class ApexRestRequestorProducerTest { EventHandlerParameters producerParameters = new EventHandlerParameters(); - try { + assertThatThrownBy(() -> { producer.init(PRODUCER_NAME, producerParameters); - } catch (ApexEventException aee) { - assertEquals("specified producer properties are not applicable to REST requestor producer (ProducerName)", - aee.getMessage()); - } + }).hasMessage("specified producer properties are not applicable to REST requestor producer (ProducerName)"); RestRequestorCarrierTechnologyParameters rrctp = new RestRequestorCarrierTechnologyParameters(); producerParameters.setCarrierTechnologyParameters(rrctp); - try { + assertThatThrownBy(() -> { producer.init(PRODUCER_NAME, producerParameters); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("REST Requestor producer (ProducerName) must run in peered requestor mode " - + "with a REST Requestor consumer", aee.getMessage()); - } + }).hasMessage("REST Requestor producer (ProducerName) must run in peered requestor mode " + + "with a REST Requestor consumer"); producerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true); rrctp.setUrl("ZZZZ"); - try { + assertThatThrownBy(() -> { producer.init(PRODUCER_NAME, producerParameters); - fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("URL may not be specified on REST Requestor producer (ProducerName)", aee.getMessage()); - } + }).hasMessage("URL may not be specified on REST Requestor producer (ProducerName)"); rrctp.setUrl(null); rrctp.setHttpMethod(RestRequestorCarrierTechnologyParameters.HttpMethod.GET); - try { + + assertThatThrownBy(() -> { producer.init(PRODUCER_NAME, producerParameters); fail("test should throw an exception here"); - } catch (ApexEventException aee) { - assertEquals("HTTP method may not be specified on REST Requestor producer (ProducerName)", - aee.getMessage()); - } + }).hasMessage("HTTP method may not be specified on REST Requestor producer (ProducerName)"); rrctp.setHttpMethod(null); producer.init(PRODUCER_NAME, producerParameters); @@ -109,28 +100,20 @@ public class ApexRestRequestorProducerTest { String eventName = "EventName"; String event = "This is the event"; - try { + assertThatThrownBy(() -> { producer.sendEvent(12345, null, eventName, event); - fail("test should throw an exception here"); - } catch (Exception aee) { - assertEquals("send of event to URL \"null\" failed, REST response consumer is not defined\n" - + "This is the event", aee.getMessage()); - } + }).hasMessage("send of event failed, REST response consumer is not defined\n" + "This is the event"); ApexEventConsumer consumer = new ApexFileEventConsumer(); - SynchronousEventCache eventCache = new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS, consumer, - producer, 1000); + SynchronousEventCache eventCache = + new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS, consumer, producer, 1000); producer.setPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS, eventCache); PeeredReference peeredReference = new PeeredReference(EventHandlerPeeredMode.REQUESTOR, consumer, producer); producer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference); - try { + assertThatThrownBy(() -> { producer.sendEvent(12345, null, eventName, event); - fail("test should throw an exception here"); - } catch (Exception aee) { - assertEquals("send of event to URL \"null\" failed, REST response consumer " - + "is not an instance of ApexRestRequestorConsumer\n" + "This is the event", - aee.getMessage()); - } + }).hasMessage("send of event failed, REST response consumer " + + "is not an instance of ApexRestRequestorConsumer\n" + "This is the event"); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParametersTest.java index b9b997f09..1e0006811 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParametersTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,10 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.util.Set; @@ -45,13 +45,9 @@ public class RestRequestorCarrierTechnologyParametersTest { arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderBadList.json"); arguments.setRelativeFileRoot("."); - try { + assertThatThrownBy(() -> { new ApexParameterHandler().getParameters(arguments); - fail("test should throw an exception here"); - } catch (ParameterException pe) { - assertTrue(pe.getMessage().contains("HTTP header array entry is null\n parameter")); - assertTrue(pe.getMessage().trim().endsWith("HTTP header array entry is null")); - } + }).hasMessageContaining("HTTP header array entry is null\n parameter"); } @Test @@ -60,15 +56,9 @@ public class RestRequestorCarrierTechnologyParametersTest { arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderNotKvPairs.json"); arguments.setRelativeFileRoot("."); - try { + assertThatThrownBy(() -> { new ApexParameterHandler().getParameters(arguments); - fail("test should throw an exception here"); - } catch (ParameterException pe) { - assertTrue(pe.getMessage() - .contains("HTTP header array entries must have one key and one value: [aaa, bbb, ccc]")); - assertTrue(pe.getMessage().trim() - .endsWith("HTTP header array entries must have one key and one value: [aaa]")); - } + }).hasMessageContaining("HTTP header array entries must have one key and one value: [aaa, bbb, ccc]"); } @Test @@ -77,13 +67,9 @@ public class RestRequestorCarrierTechnologyParametersTest { arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTRequestorWithHTTPHeaderNulls.json"); arguments.setRelativeFileRoot("."); - try { + assertThatThrownBy(() -> { new ApexParameterHandler().getParameters(arguments); - fail("test should throw an exception here"); - } catch (ParameterException pe) { - assertTrue(pe.getMessage().contains("HTTP header key is null or blank: [null, bbb]")); - assertTrue(pe.getMessage().trim().endsWith("HTTP header value is null or blank: [ccc, null]")); - } + }).hasMessageContaining("HTTP header key is null or blank: [null, bbb]"); } @Test @@ -95,11 +81,11 @@ public class RestRequestorCarrierTechnologyParametersTest { ApexParameters parameters = new ApexParameterHandler().getParameters(arguments); RestRequestorCarrierTechnologyParameters rrctp0 = (RestRequestorCarrierTechnologyParameters) parameters - .getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters(); + .getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters(); assertEquals(0, rrctp0.getHttpHeaders().length); RestRequestorCarrierTechnologyParameters rrctp1 = (RestRequestorCarrierTechnologyParameters) parameters - .getEventInputParameters().get("RestRequestorConsumer1").getCarrierTechnologyParameters(); + .getEventInputParameters().get("RestRequestorConsumer1").getCarrierTechnologyParameters(); assertEquals(3, rrctp1.getHttpHeaders().length); assertEquals("bbb", rrctp1.getHttpHeadersAsMultivaluedMap().get("aaa").get(0)); assertEquals("ddd", rrctp1.getHttpHeadersAsMultivaluedMap().get("ccc").get(0)); @@ -112,16 +98,13 @@ public class RestRequestorCarrierTechnologyParametersTest { arguments.setConfigurationFilePath("src/test/resources/prodcons/RESTClientWithHTTPFilterInvalid.json"); arguments.setRelativeFileRoot("."); - try { + assertThatThrownBy(() -> { new ApexParameterHandler().getParameters(arguments); ApexParameters parameters = new ApexParameterHandler().getParameters(arguments); parameters.getEventInputParameters().get("RestRequestorConsumer0").getCarrierTechnologyParameters(); - fail("test should throw an exception here"); - } catch (ParameterException pe) { - assertTrue(pe.getMessage().contains( - "Invalid HTTP code filter, the filter must be specified as a three digit regular expression: ")); - } + }).hasMessageContaining( + "Invalid HTTP code filter, the filter must be specified as a three digit regular expression: "); } @Test @@ -163,8 +146,8 @@ public class RestRequestorCarrierTechnologyParametersTest { assertEquals(RestRequestorCarrierTechnologyParameters.HttpMethod.DELETE, rrctp.getHttpMethod()); assertEquals("RESTREQUESTORCarrierTechnologyParameters " - + "[url=http://some.where, httpMethod=DELETE, httpHeaders=[[aaa, bbb], [ccc, ddd]]," - + " httpCodeFilter=[1-5][0][0-5]]", rrctp.toString()); + + "[url=http://some.where, httpMethod=DELETE, httpHeaders=[[aaa, bbb], [ccc, ddd]]," + + " httpCodeFilter=[1-5][0][0-5]]", rrctp.toString()); } @Test diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java index 326be511c..cbb81f9da 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java @@ -21,6 +21,7 @@ package org.onap.policy.apex.plugins.event.carrier.restrequestor; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -30,6 +31,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -40,7 +42,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; -import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.service.engine.main.ApexMain; import org.onap.policy.common.endpoints.http.server.HttpServletServer; @@ -105,47 +106,23 @@ public class RestRequestorTest { * Test rest requestor get. * * @throws MessagingException the messaging exception - * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. + * @throws Exception an exception */ @Test - public void testRestRequestorGet() throws MessagingException, ApexException, IOException { + public void testRestRequestorGet() throws Exception { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGet.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - Response response = null; - - // Wait for the required amount of events to be received or for 10 seconds - Double getsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - if (Response.Status.OK.getStatusCode() != response.getStatus()) { - break; - } - - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); - getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); - - if (getsSoFar >= 50.0) { - break; - } - } + await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) + .until(() -> getStatsFromServer(client, "GET") >= 50.0); apexMain.shutdown(); - client.close(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); - assertEquals(Double.valueOf(50.0), getsSoFar); + client.close(); } /** @@ -161,14 +138,13 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetEmpty.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); Response response = null; // Wait for the required amount of events to be received or for 10 seconds Double getsSoFar = 0.0; for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") .request("application/json").get(); @@ -188,6 +164,8 @@ public class RestRequestorTest { } apexMain.shutdown(); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); + client.close(); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); @@ -206,37 +184,15 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePut.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - // Wait for the required amount of events to be received or for 10 seconds - Double putsSoFar = 0.0; - - Response response = null; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - if (Response.Status.OK.getStatusCode() != response.getStatus()) { - break; - } - - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); - putsSoFar = Double.valueOf(jsonMap.get("PUT").toString()); - - if (putsSoFar >= 50.0) { - break; - } - } + await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) + .until(() -> getStatsFromServer(client, "PUT") >= 50.0); apexMain.shutdown(); - client.close(); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - assertEquals(Double.valueOf(50.0), putsSoFar); + client.close(); } /** @@ -252,31 +208,15 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePost.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - // Wait for the required amount of events to be received or for 10 seconds - Double postsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); - postsSoFar = Double.valueOf(jsonMap.get("POST").toString()); - - if (postsSoFar >= 50.0) { - break; - } - } + await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) + .until(() -> getStatsFromServer(client, "POST") >= 50.0); apexMain.shutdown(); - client.close(); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); - assertEquals(Double.valueOf(50.0), postsSoFar); + client.close(); } /** @@ -292,31 +232,16 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileDelete.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - // Wait for the required amount of events to be received or for 10 seconds - Double deletesSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); - deletesSoFar = Double.valueOf(jsonMap.get("DELETE").toString()); - - if (deletesSoFar >= 50.0) { - break; - } - } + // Wait for the required amount of events to be received + await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) + .until(() -> getStatsFromServer(client, "DELETE") >= 50.0); apexMain.shutdown(); - client.close(); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); - assertEquals(Double.valueOf(50.0), deletesSoFar); + client.close(); } /** @@ -332,31 +257,15 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json"}; final ApexMain apexMain = new ApexMain(args); + await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - // Wait for the required amount of events to be received or for 10 seconds - Double getsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); - getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); - - if (getsSoFar >= 8.0) { - break; - } - } + await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) + .until(() -> getStatsFromServer(client, "GET") >= 8.0); apexMain.shutdown(); - client.close(); + await().atMost(2, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); - assertEquals(Double.valueOf(8.0), getsSoFar); + client.close(); } /** @@ -373,8 +282,7 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json"}; - final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(200); + ApexMain apexMain = new ApexMain(args); apexMain.shutdown(); final String outString = outContent.toString(); @@ -400,8 +308,7 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json"}; - final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(200); + ApexMain apexMain = new ApexMain(args); apexMain.shutdown(); final String outString = outContent.toString(); @@ -412,4 +319,16 @@ public class RestRequestorTest { assertTrue(outString.contains("peer \"RestRequestorProducer for peered mode REQUESTOR " + "does not exist or is not defined with the same peered mode")); } + + private Double getStatsFromServer(final Client client, final String statToGet) { + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map<String, Object> jsonMap = new Gson().fromJson(responseString, Map.class); + return Double.valueOf(jsonMap.get(statToGet).toString()); + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml new file mode 100644 index 000000000..f0fd0b100 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/logback-test.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + Copyright (C) 2016-2018 Ericsson. All rights reserved. + Modifications Copyright (C) 2020 Nordix Foundation. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + SPDX-License-Identifier: Apache-2.0 + ============LICENSE_END========================================================= +--> + +<configuration> + <contextName>Apex</contextName> + <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" /> + + <!-- USE FOR STD OUT ONLY --> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <Pattern>%d %contextName [%t] %level %logger{36} - %msg%n</Pattern> + </encoder> + </appender> + + <root level="ERROR"> + <appender-ref ref="STDOUT" /> + </root> + + <logger name="org.onap.policy.apex.plugins.event.carrier.restrequestor" level="ERROR" additivity="false"> + <appender-ref ref="STDOUT" /> + </logger> +</configuration> diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 205b865d5..fddbcb79f 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -106,7 +106,7 @@ public class ApexActivator { ApexParameters apexParameters = apexParametersMap.values().iterator().next(); // totalInstanceCount is the sum of instance counts required as per each policy int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); + .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); instantiateEngine(apexParameters); setUpModelMarhsallerAndUnmarshaller(apexParameters); @@ -126,12 +126,12 @@ public class ApexActivator { for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) { ApexParameters apexParams = apexParamsEntry.getValue(); boolean duplicateInputParameterExist = - apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); + apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); boolean duplicateOutputParameterExist = - apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); + apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); if (duplicateInputParameterExist || duplicateOutputParameterExist) { LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.", - apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); + apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); apexParametersMap.remove(apexParamsEntry.getKey()); continue; } @@ -140,22 +140,29 @@ public class ApexActivator { // Check if a policy model file has been specified if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .", - apexParams.getEngineServiceParameters().getPolicyModelFileName()); + apexParams.getEngineServiceParameters().getPolicyModelFileName()); - final String policyModelString = TextFileUtils - .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); + final String policyModelString = + TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } } AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + // Set the policy model in the engine apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + true); + setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + outputParametersMap); + + // Wire up pairings between marhsallers and unmarshallers setUpMarshalerPairings(inputParametersMap); + + // Start event processing + startUnmarshallers(inputParametersMap); } private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) { @@ -163,42 +170,43 @@ public class ApexActivator { ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey()); AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue()); Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream = - policyModelsMap.entrySet().stream().skip(1); + policyModelsMap.entrySet().stream().skip(1); Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry = - policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { - try { - entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), - true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - })); + policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { + try { + entry1.setValue( + PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); + } catch (ApexModelException exc) { + LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", + entry2.getKey().getName(), entry2.getKey().getVersion(), exc); + apexParametersMap.remove(entry2.getKey()); + policyModelsMap.remove(entry2.getKey()); + } + return entry1; + })); AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue()); policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap return finalPolicyModel; } private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, - Map<String, EventHandlerParameters> inputParametersMap, - Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException { + Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap) + throws ApexEventException { // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) { final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(), - engineServiceParameters, outputParameters.getValue()); + engineServiceParameters, outputParameters.getValue()); marshaller.init(); apexEngineService.registerActionListener(outputParameters.getKey(), marshaller); marshallerMap.put(outputParameters.getKey(), marshaller); } + // Consumer parameters specify what event unmarshalers to handle events coming into Apex // are set up and how they are set up for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(), - engineServiceParameters, inputParameters.getValue()); + engineServiceParameters, inputParameters.getValue()); unmarshallerMap.put(inputParameters.getKey(), unmarshaller); unmarshaller.init(engineServiceHandler); } @@ -206,7 +214,7 @@ public class ApexActivator { private void instantiateEngine(ApexParameters apexParameters) throws ApexException { if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -216,7 +224,7 @@ public class ApexActivator { // Instantiate and start the messaging service for Deployment LOGGER.debug("starting apex deployment service . . ."); final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService, - apexParameters.getEngineServiceParameters().getDeploymentPort()); + apexParameters.getEngineServiceParameters().getDeploymentPort()); engDepService.start(); // Create the engine holder to hold the engine's references and act as an event receiver @@ -240,14 +248,23 @@ public class ApexActivator { if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler final ApexEventMarshaller peeredMarshaler = - marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); } } - // Now let's get events flowing - unmarshaller.start(); + } + } + + /** + * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done. + * + * @param inputParametersMap the apex parameters + */ + private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) { + for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { + unmarshallerMap.get(inputParameters.getKey()).start(); } } |