diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier')
5 files changed, 54 insertions, 67 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml index a2e65c995..d36dd4c5a 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= Copyright (C) 2018 Ericsson. All rights reserved. Modifications Copyright (C) 2022 Bell Canada. All rights reserved. - Modifications Copyright (C) 2023 Nordix Foundation. + Modifications Copyright (C) 2023-2024 Nordix Foundation. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -41,6 +41,16 @@ <artifactId>kafka-avro-serializer</artifactId> <version>${version.kafka-avro-serializer}</version> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${version.avro}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.25.0</version> + </dependency> </dependencies> <repositories> diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index bcf76aa51..2aec19d4d 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019-2020, 2023 Nordix Foundation. + * Modifications Copyright (C) 2019-2020, 2023-2024 Nordix Foundation. * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -90,7 +90,7 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { private final Map<ApexRestRequest, RestRequestRunner> ongoingRestRequestMap = new ConcurrentHashMap<>(); // The number of events received to date - private Object eventsReceivedLock = new Object(); + private final Object eventsReceivedLock = new Object(); @Getter private int eventsReceived = 0; @@ -303,18 +303,20 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { NetLoggerUtil.log(EventType.OUT, CommInfrastructure.REST, url, request.getEvent().toString()); } // Execute the REST request - final var response = sendEventAsRestRequest(url); - // Get the event we received - final var eventJsonString = response.readEntity(String.class); - NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, eventJsonString); - // Match the return code - var isPass = httpCodeFilterPattern.matcher(String.valueOf(response.getStatus())); - - // Check that the request worked - if (!isPass.matches()) { - final String errorMessage = "reception of event from URL \"" + restConsumerProperties.getUrl() - + "\" failed with status code " + response.getStatus(); - throw new ApexEventRuntimeException(errorMessage); + final String eventJsonString; + try (var response = sendEventAsRestRequest(url)) { + // Get the event we received + eventJsonString = response.readEntity(String.class); + NetLoggerUtil.log(EventType.IN, CommInfrastructure.REST, url, eventJsonString); + // Match the return code + var isPass = httpCodeFilterPattern.matcher(String.valueOf(response.getStatus())); + + // Check that the request worked + if (!isPass.matches()) { + final String errorMessage = "reception of event from URL \"" + restConsumerProperties.getUrl() + + "\" failed with status code " + response.getStatus(); + throw new ApexEventRuntimeException(errorMessage); + } } // Check there is content @@ -354,24 +356,14 @@ public class ApexRestRequestorConsumer extends ApexPluginsEventConsumer { public Response sendEventAsRestRequest(String url) { Builder headers = client.target(url).request(APPLICATION_JSON) .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()); - switch (restConsumerProperties.getHttpMethod()) { - case GET: - return headers.get(); - - case PUT: - return headers.put(Entity.json(request.getEvent())); - - case POST: - return headers.post(Entity.json(request.getEvent())); - - case DELETE: - return headers.delete(); - - default: - break; - } + LOGGER.info("event from request: {}", request.getEvent()); + return switch (restConsumerProperties.getHttpMethod()) { + case GET -> headers.get(); + case PUT -> headers.put(Entity.json(request.getEvent())); + case POST -> headers.post(Entity.json(request.getEvent())); + case DELETE -> headers.delete(); + }; - return null; } } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java index 66c5f57c6..e72fa9030 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumerTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2019-2020,2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -90,7 +91,7 @@ public class ApexRestRequestorConsumerTest { assertEquals(CONSUMER_NAME, consumer.getName()); assertEquals(0, consumer.getEventsReceived()); - assertEquals(null, consumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); + assertNull(consumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); } @Test diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java index 842999fcd..257e533c4 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019-2020,2023 Nordix Foundation. + * Modifications Copyright (C) 2019-2020,2023-2024 Nordix Foundation. * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,7 +31,6 @@ import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; import jakarta.ws.rs.core.Response; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.PrintStream; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -84,10 +83,9 @@ public class RestRequestorTest { /** * Tear down. * - * @throws Exception the exception */ @AfterClass - public static void tearDown() throws Exception { + public static void tearDown() { if (server != null) { server.stop(); } @@ -124,7 +122,7 @@ public class RestRequestorTest { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGet.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(2, TimeUnit.SECONDS).until(apexMain::isAlive); await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) .until(() -> getStatsFromServer(client, "GET") >= 50.0); @@ -138,17 +136,15 @@ public class RestRequestorTest { /** * Test rest requestor get empty. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorGetEmpty() throws MessagingException, ApexException, IOException { + public void testRestRequestorGetEmpty() throws ApexException { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetEmpty.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(2, TimeUnit.SECONDS).until(apexMain::isAlive); Response response = null; @@ -184,17 +180,15 @@ public class RestRequestorTest { /** * Test REST requestor put. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorPut() throws MessagingException, ApexException, IOException { + public void testRestRequestorPut() throws ApexException { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePut.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(2, TimeUnit.SECONDS).until(apexMain::isAlive); await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) .until(() -> getStatsFromServer(client, "PUT") >= 50.0); @@ -208,17 +202,15 @@ public class RestRequestorTest { /** * Test REST requestor post. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorPost() throws MessagingException, ApexException, IOException { + public void testRestRequestorPost() throws ApexException { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FilePost.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(2, TimeUnit.SECONDS).until(apexMain::isAlive); await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) .until(() -> getStatsFromServer(client, "POST") >= 50.0); @@ -232,17 +224,15 @@ public class RestRequestorTest { /** * Test REST requestor delete. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorDelete() throws MessagingException, ApexException, IOException { + public void testRestRequestorDelete() throws ApexException { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileDelete.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(2, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(2, TimeUnit.SECONDS).until(apexMain::isAlive); // Wait for the required amount of events to be received await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) @@ -257,17 +247,15 @@ public class RestRequestorTest { /** * Test REST requestor multi inputs. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorMultiInputs() throws MessagingException, ApexException, IOException { + public void testRestRequestorMultiInputs() throws ApexException { final Client client = ClientBuilder.newClient(); final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json"}; final ApexMain apexMain = new ApexMain(args); - await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); + await().atMost(10, TimeUnit.SECONDS).until(apexMain::isAlive); await().pollInterval(300, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS) .until(() -> getStatsFromServer(client, "GET") >= 8.0); @@ -281,12 +269,10 @@ public class RestRequestorTest { /** * Test REST requestor producer alone. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorProducerAlone() throws MessagingException, ApexException, IOException { + public void testRestRequestorProducerAlone() throws ApexException { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json"}; @@ -302,12 +288,10 @@ public class RestRequestorTest { /** * Test REST requestor consumer alone. * - * @throws MessagingException the messaging exception * @throws ApexException the apex exception - * @throws IOException Signals that an I/O exception has occurred. */ @Test - public void testRestRequestorConsumerAlone() throws MessagingException, ApexException, IOException { + public void testRestRequestorConsumerAlone() throws ApexException { final String[] args = {"src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json"}; ApexMain apexMain = new ApexMain(args); apexMain.shutdown(); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/SupportRestRequestorEndpoint.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/SupportRestRequestorEndpoint.java index 23aba09a4..b05c24333 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/SupportRestRequestorEndpoint.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/SupportRestRequestorEndpoint.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2020, 2023 Nordix Foundation. + * Modifications Copyright (C) 2020, 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ import java.util.Map; @Path("/apex") public class SupportRestRequestorEndpoint { - private static Object counterLock = new Object(); + private static final Object counterLock = new Object(); private static int postMessagesReceived = 0; private static int putMessagesReceived = 0; private static int statMessagesReceived = 0; |