diff options
Diffstat (limited to 'src')
11 files changed, 137 insertions, 52 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 45f01ce..8bf32c4 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -108,7 +108,7 @@ public class ApplicationSettings { } public int httpPort() { - return properties.getInt("collector.rcc.service.port", 8686); + return properties.getInt("collector.rcc.service.port", 8080); } public int httpsPort() { diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java index 27fccd6..05c299e 100644 --- a/src/main/java/org/onap/dcae/RestConfCollector.java +++ b/src/main/java/org/onap/dcae/RestConfCollector.java @@ -24,6 +24,7 @@ package org.onap.dcae; import io.vavr.collection.Map; import org.json.JSONArray; import org.json.JSONObject; +import org.onap.dcae.common.ControllerActivationState; import org.onap.dcae.common.EventData; import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; @@ -43,6 +44,7 @@ import org.springframework.context.annotation.Lazy; import java.nio.file.Paths; import java.util.HashMap; +import java.util.Iterator; import java.util.concurrent.*; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) @@ -56,8 +58,10 @@ public class RestConfCollector { private static ConfigLoader configLoader; private static SpringApplication app; private static ScheduledFuture<?> scheduleFeatures; + private static ScheduledFuture<?> scheduleCtrlActivation; private static ExecutorService executor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + private static ScheduledThreadPoolExecutor scheduledThExController; private static EventPublisher eventPublisher; private static EventProcessor eventProcessor; @@ -70,6 +74,7 @@ public class RestConfCollector { app = new SpringApplication(RestConfCollector.class); properties = new ApplicationSettings(args, CLIUtils::processCmdLine); scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + scheduledThExController = new ScheduledThreadPoolExecutor(1); init(); app.setAddCommandLineProperties(true); context = app.run(); @@ -85,6 +90,7 @@ public class RestConfCollector { context.close(); properties.reloadProperties(); scheduleFeatures.cancel(true); + scheduleCtrlActivation.cancel(true); init(); controllerConfig(properties); context = SpringApplication.run(RestConfCollector.class); @@ -174,6 +180,11 @@ public class RestConfCollector { 10, 10, TimeUnit.MINUTES); + ControllerActivationTask task = new ControllerActivationTask(); + scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task, + 10, + 10, + TimeUnit.SECONDS); } private static void createExecutors() { @@ -186,4 +197,28 @@ public class RestConfCollector { executor.execute(eventProcessor); } } + + private static class ControllerActivationTask implements Runnable + { + public ControllerActivationTask() { + } + + @Override + public void run() + { + try { + Iterator<String> it1 = controllerStore.keySet().iterator(); + while(it1.hasNext()){ + String key = it1.next(); + AccessController ctlr = controllerStore.get(key); + if(ctlr.getState() == ControllerActivationState.INIT) { + log.info("Activating controller " + key); + ctlr.activate(); + } + } + } catch (Exception e) { + log.info("Activation failed"); + } + } + } } diff --git a/src/main/java/org/onap/dcae/common/ControllerActivationState.java b/src/main/java/org/onap/dcae/common/ControllerActivationState.java new file mode 100644 index 0000000..deb2b1f --- /dev/null +++ b/src/main/java/org/onap/dcae/common/ControllerActivationState.java @@ -0,0 +1,25 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018-2019 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.common; + +public enum ControllerActivationState { + INIT, ACTIVE, NONE; +} diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java index 164aed3..cf44921 100644 --- a/src/main/java/org/onap/dcae/common/EventProcessor.java +++ b/src/main/java/org/onap/dcae/common/EventProcessor.java @@ -76,9 +76,9 @@ public class EventProcessor implements Runnable { private void sendEventsToStreams(String[] streamIdList, EventData ev) {
for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
- if (!ev.getConn().getEvent_ruleId().equals("")) {
+ if (!ev.getConn().getEventRuleId().equals("")) {
JSONObject customHeader = new JSONObject();
- customHeader.put("rule-id", ev.getConn().getEvent_ruleId());
+ customHeader.put("rule-id", ev.getConn().getEventRuleId());
eventPublisher.sendEvent(overrideEvent(customHeader, ev.getEventObj()), aStreamIdList);
} else {
eventPublisher.sendEvent(ev.getEventObj(), aStreamIdList);
diff --git a/src/main/java/org/onap/dcae/controller/AccessController.java b/src/main/java/org/onap/dcae/controller/AccessController.java index 165c468..226ead5 100644 --- a/src/main/java/org/onap/dcae/controller/AccessController.java +++ b/src/main/java/org/onap/dcae/controller/AccessController.java @@ -24,6 +24,7 @@ import org.json.JSONObject; import org.onap.dcae.ApplicationException; import org.onap.dcae.ApplicationSettings; import org.onap.dcae.common.Constants; +import org.onap.dcae.common.ControllerActivationState; import org.onap.dcae.common.RestConfContext; import org.onap.dcae.common.RestapiCallNode; import org.slf4j.Logger; @@ -56,6 +57,8 @@ public class AccessController { private ExecutorService executor = Executors.newCachedThreadPool(); private Map<String, String> paraMap; + ControllerActivationState state; + public AccessController(JSONObject controller, ApplicationSettings properties) { this.cfgInfo = new ControllerConfigInfo(controller.get("controller_name").toString(), @@ -71,7 +74,7 @@ public class AccessController { this.ctx = new RestConfContext(); this.restApiCallNode = new RestapiCallNode(); this.paraMap = new HashMap<>(); - + this.state = ControllerActivationState.INIT; prepareControllerParamMap(); log.info("AccesController Created {} {} {} {} {} {}", @@ -85,12 +88,22 @@ public class AccessController { @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof AccessController)) return false; + if (this == o) + return true; + if (!(o instanceof AccessController)) + return false; AccessController that = (AccessController) o; return that.cfgInfo.getController_name().equals(that.cfgInfo.getController_name()); } + public ControllerActivationState getState() { + return state; + } + + public void setState(ControllerActivationState state) { + this.state = state; + } + @Override public int hashCode() { return Objects.hash(this.cfgInfo.getController_name()); @@ -107,8 +120,7 @@ public class AccessController { this.executor = executor; } - private void fetch_TokenId() { - + private void fetchTokenId() { modifyControllerParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(this.properties.authorizationEnabled()) + cfgInfo.getController_restapiUrl() + cfgInfo.getController_accessTokenUrl()); modifyControllerParamMap(Constants.KDEFAULT_TEMP_FILENAME, cfgInfo.getController_accessTokenFile()); @@ -117,6 +129,7 @@ public class AccessController { modifyControllerParamMap(Constants.KSETTING_HTTP_METHOD, cfgInfo.getController_accessTokenMethod()); String httpResponse = null; + try { getRestApiCallNode().sendRequest(this.paraMap, ctx, null); @@ -125,43 +138,46 @@ public class AccessController { log.info("httpResponse ", httpResponse + " key " + key); JSONObject jsonObj = new JSONObject(httpResponse); log.info("jsonObj ", jsonObj.toString()); - //JSONObject data = jsonObj.getJSONObject("data"); - //String tokenId = data.get("accessSession").toString(); //@TODO: Make return field dynamic String tokenId = jsonObj.get("accessSession").toString(); - log.info("token 1" + tokenId); modifyControllerParamMap(Constants.KSETTING_TOKENID, tokenId); modifyControllerParamMap(Constants.KSETTING_CUSTOMHTTP_HEADER, "X-ACCESS-TOKEN=" + tokenId); + setState(ControllerActivationState.ACTIVE); + } catch (Exception e) { - log.info("Access token is not supported" + e.getMessage()); + log.info("Access token is not supported " + e.getMessage()); log.info("http response " + httpResponse); } } + public void activate() { - fetch_TokenId(); - printControllerParamMap(); - /* Create eventlist from properties */ - JSONArray contollers = new JSONArray(properties.rccPolicy()); - for (int i = 0; i < contollers.length(); i++) { - JSONObject controller = contollers.getJSONObject(i); - if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) { - JSONArray eventlists = controller.getJSONArray("event_details"); - for (int j = 0; j < eventlists.length(); j++) { - JSONObject event = eventlists.getJSONObject(j); - String name = event.get("event_name").toString(); - PersistentEventConnection conn = new PersistentEventConnection(name, - event.get("event_description").toString(), - Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()), - event.get("event_sseventsField").toString(), - event.get("event_sseventsUrl").toString(), - event.get("event_subscriptionTemplate").toString(), - event.get("event_unSubscriptionTemplate").toString(), - event.get("event_ruleId").toString(), - this); - - eventList.put(name, conn); - executor.execute(conn); + fetchTokenId(); + + if (getState() == ControllerActivationState.ACTIVE) { + printControllerParamMap(); + /* Create eventlist from properties */ + JSONArray contollers = new JSONArray(properties.rccPolicy()); + for (int i = 0; i < contollers.length(); i++) { + JSONObject controller = contollers.getJSONObject(i); + if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) { + JSONArray eventlists = controller.getJSONArray("event_details"); + for (int j = 0; j < eventlists.length(); j++) { + JSONObject event = eventlists.getJSONObject(j); + String name = event.get("event_name").toString(); + PersistentEventConnection conn = new PersistentEventConnection(name, + event.get("event_description").toString(), + Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()), + event.get("event_sseventsField").toString(), + event.get("event_sseventsUrl").toString(), + event.get("event_subscriptionTemplate").toString(), + event.get("event_unSubscriptionTemplate").toString(), + event.get("event_ruleId").toString(), + this); + + eventList.put(name, conn); + executor.execute(conn); + } } } } diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java index 99ad1af..047936d 100644 --- a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java +++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java @@ -185,7 +185,7 @@ public class PersistentEventConnection implements Runnable { return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build(); } - public String getEvent_ruleId() { + public String getEventRuleId() { return event_ruleId; } diff --git a/src/test/java/org/onap/dcae/AccessControllerTest.java b/src/test/java/org/onap/dcae/AccessControllerTest.java index b93e190..5469d89 100644 --- a/src/test/java/org/onap/dcae/AccessControllerTest.java +++ b/src/test/java/org/onap/dcae/AccessControllerTest.java @@ -106,6 +106,9 @@ public class AccessControllerTest { JSONObject controller = new JSONObject("{\"controller_name\":\"AccessM&C\",\"controller_restapiUrl\":\"10.118.191.43:26335\",\"controller_restapiUser\":\"access\",\"controller_restapiPassword\":\"Huawei@123\",\"controller_accessTokenUrl\":\"/rest/plat/smapp/v1/oauth/token\",\"controller_accessTokenFile\":\"./etc/access-token.json\",\"controller_accessTokenMethod\":\"put\",\"controller_subsMethod\":\"post\",\"controller_subscriptionUrl\":\"/restconf/v1/operations/huawei-nce-notification-action:establish-subscription\",\"event_details\":[{\"event_name\":\"ONT_registration\",\"event_description\":\"ONTregistartionevent\",\"event_sseventUrlEmbed\":\"true\",\"event_sseventsField\":\"output.url\",\"event_sseventsUrl\":\"null\",\"event_subscriptionTemplate\":\"./etc/ont_registartion_subscription_template.json\",\"event_unSubscriptionTemplate\":\"./etc/ont_registartion_unsubscription_template.json\",\"event_ruleId\":\"777777777\"}]}"); AccessController acClr = new AccessController(controller, properties); + AccessController acClr2 = new AccessController(controller, + properties); + acClr.equals(acClr2); acClr.setRestApiCallNode(restApiCallNode); acClr.setExecutor(executor); acClr.getCtx().setAttribute("responsePrefix.httpResponse","{\"accessSession\" : \"12dsaf4-2323-1231131232323\"}"); diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java index bc78a24..3a9549f 100644 --- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java +++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java @@ -119,7 +119,7 @@ public class ApplicationSettingsTest { int applicationPort = fromTemporaryConfiguration().httpPort(); // then - assertEquals(8686, applicationPort); + assertEquals(8080, applicationPort); } @Test @@ -135,7 +135,7 @@ public class ApplicationSettingsTest { @Test public void shouldReturnIfHTTPIsEnabled() throws IOException { // when - boolean httpsEnabled = fromTemporaryConfiguration("collector.service.port=8686").httpsEnabled(); + boolean httpsEnabled = fromTemporaryConfiguration("collector.service.port=8080").httpsEnabled(); // then assertTrue(httpsEnabled); } diff --git a/src/test/java/org/onap/dcae/common/EventProcessorTest.java b/src/test/java/org/onap/dcae/common/EventProcessorTest.java index e23a3df..406a0fe 100644 --- a/src/test/java/org/onap/dcae/common/EventProcessorTest.java +++ b/src/test/java/org/onap/dcae/common/EventProcessorTest.java @@ -92,6 +92,8 @@ public class EventProcessorTest { PersistentEventConnection p = new PersistentEventConnection("","",true, "", "","","","1234646346", acClr); + p.getEventParamMapValue("restapiUrl"); + p.modifyEventParamMap("restapiUrl", "10.118.191.43:26335"); RestConfCollector.fProcessingInputQueue = new LinkedBlockingQueue<>(4); RestConfCollector.fProcessingInputQueue.offer(new EventData(p, new JSONObject("{}"))); RestConfCollector.fProcessingInputQueue.offer(new EventData(null, null)); diff --git a/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java b/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java index 5c8593e..b34d393 100644 --- a/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java +++ b/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java @@ -38,7 +38,7 @@ public class ConfigCBSSourceTest extends WiremockBasedTest { @Test public void shouldReturnValidAppConfiguration() { // given - String sampleConfigForVES = "{\"collector.rcc.service.port\": 8686}"; + String sampleConfigForVES = "{\"collector.rcc.service.port\": 8080}"; stubConsulToReturnLocalAddressOfCBS(); stubCBSToReturnAppConfig(sampleConfigForVES); diff --git a/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java b/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java index 730a98d..bcba7fb 100644 --- a/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java +++ b/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java @@ -23,11 +23,7 @@ package org.onap.dcae.controller; import static io.vavr.API.Map; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.onap.dcae.TestingUtilities.createTemporaryFile; import static org.onap.dcae.TestingUtilities.readFile; import static org.onap.dcae.TestingUtilities.readJSONFromFile; @@ -51,24 +47,26 @@ public class ConfigLoaderIntegrationE2ETest extends WiremockBasedTest { @Test public void testSuccessfulE2EFlow() { - // given +// // given // Path dMaaPConfigFile = createTemporaryFile("{}"); // Path collectorPropertiesFile = createTemporaryFile(""); // Path dMaaPConfigSource = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"); // JSONObject dMaaPConf = readJSONFromFile(dMaaPConfigSource); // stubConsulToReturnLocalAddressOfCBS(); -// stubCBSToReturnAppConfig(f("{\"collector.port\": 8686, \"streams_publishes\": %s}}", dMaaPConf)); +// stubCBSToReturnAppConfig(f("{\"collector.port\": 8080, \"streams_publishes\": %s}}", dMaaPConf)); // // EventPublisher eventPublisherMock = mock(EventPublisher.class); // -// Mockito.mock(RestConfCollector.class); +// RestConfCollector rs = Mockito.mock(RestConfCollector.class); +// +// // ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(dMaaPConfigFile, collectorPropertiesFile); // ConfigLoader configLoader = new ConfigLoader(eventPublisherMock::reconfigure, configFilesFacade, ConfigSource::getAppConfig, () -> wiremockBasedEnvProps()); // configLoader.updateConfig(); // // // then // assertThat(readJSONFromFile(dMaaPConfigSource).toString()).isEqualTo(dMaaPConf.toString()); -// assertThat(readFile(collectorPropertiesFile).trim()).isEqualTo("collector.port = 8686"); +// assertThat(readFile(collectorPropertiesFile).trim()).isEqualTo("collector.port = 8080"); // verify(eventPublisherMock, times(1)).reconfigure( // DMaaPConfigurationParser.parseToDomainMapping(dMaaPConf).get() // ); @@ -81,18 +79,24 @@ public class ConfigLoaderIntegrationE2ETest extends WiremockBasedTest { Path collectorPropertiesFile = createTemporaryFile(""); JSONObject dMaaPConf = readJSONFromFile(Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json")); stubConsulToReturnLocalAddressOfCBS(); - stubCBSToReturnAppConfig(f("{\"collector.port\": 8686, \"streams_publishes\": %s}}", dMaaPConf)); + stubCBSToReturnAppConfig(f("{\"collector.port\": 8080, \"streams_publishes\": %s}}", dMaaPConf)); EventPublisher eventPublisherMock = mock(EventPublisher.class); ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(dMaaPConfigFile, collectorPropertiesFile); - configFilesFacade.writeProperties(Map("collector.port", "8686")); + configFilesFacade.writeProperties(Map("collector.port", "8080")); configFilesFacade.writeDMaaPConfiguration(dMaaPConf); - // when ConfigLoader configLoader = new ConfigLoader(eventPublisherMock::reconfigure, configFilesFacade, ConfigSource::getAppConfig, () -> wiremockBasedEnvProps()); configLoader.updateConfig(); - // then + verifyZeroInteractions(eventPublisherMock); + + // when + JSONObject dMaaPConf2 = readJSONFromFile(Paths.get("src/test/resources/testParseDMaaPCredentialsGen2Temp.json")); + configFilesFacade.writeDMaaPConfiguration(dMaaPConf2); + configFilesFacade.writeProperties(Map("collector.port", "8081")); + configLoader.updateConfig(); + } }
\ No newline at end of file |