summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java2
-rw-r--r--src/main/java/org/onap/dcae/RestConfCollector.java35
-rw-r--r--src/main/java/org/onap/dcae/common/ControllerActivationState.java25
-rw-r--r--src/main/java/org/onap/dcae/common/EventProcessor.java4
-rw-r--r--src/main/java/org/onap/dcae/controller/AccessController.java80
-rw-r--r--src/main/java/org/onap/dcae/controller/PersistentEventConnection.java2
-rw-r--r--src/test/java/org/onap/dcae/AccessControllerTest.java3
-rw-r--r--src/test/java/org/onap/dcae/ApplicationSettingsTest.java4
-rw-r--r--src/test/java/org/onap/dcae/common/EventProcessorTest.java2
-rw-r--r--src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java2
-rw-r--r--src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java30
11 files changed, 137 insertions, 52 deletions
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index 45f01ce..8bf32c4 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -108,7 +108,7 @@ public class ApplicationSettings {
}
public int httpPort() {
- return properties.getInt("collector.rcc.service.port", 8686);
+ return properties.getInt("collector.rcc.service.port", 8080);
}
public int httpsPort() {
diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java
index 27fccd6..05c299e 100644
--- a/src/main/java/org/onap/dcae/RestConfCollector.java
+++ b/src/main/java/org/onap/dcae/RestConfCollector.java
@@ -24,6 +24,7 @@ package org.onap.dcae;
import io.vavr.collection.Map;
import org.json.JSONArray;
import org.json.JSONObject;
+import org.onap.dcae.common.ControllerActivationState;
import org.onap.dcae.common.EventData;
import org.onap.dcae.common.EventProcessor;
import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
@@ -43,6 +44,7 @@ import org.springframework.context.annotation.Lazy;
import java.nio.file.Paths;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.concurrent.*;
@SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
@@ -56,8 +58,10 @@ public class RestConfCollector {
private static ConfigLoader configLoader;
private static SpringApplication app;
private static ScheduledFuture<?> scheduleFeatures;
+ private static ScheduledFuture<?> scheduleCtrlActivation;
private static ExecutorService executor;
private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+ private static ScheduledThreadPoolExecutor scheduledThExController;
private static EventPublisher eventPublisher;
private static EventProcessor eventProcessor;
@@ -70,6 +74,7 @@ public class RestConfCollector {
app = new SpringApplication(RestConfCollector.class);
properties = new ApplicationSettings(args, CLIUtils::processCmdLine);
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+ scheduledThExController = new ScheduledThreadPoolExecutor(1);
init();
app.setAddCommandLineProperties(true);
context = app.run();
@@ -85,6 +90,7 @@ public class RestConfCollector {
context.close();
properties.reloadProperties();
scheduleFeatures.cancel(true);
+ scheduleCtrlActivation.cancel(true);
init();
controllerConfig(properties);
context = SpringApplication.run(RestConfCollector.class);
@@ -174,6 +180,11 @@ public class RestConfCollector {
10,
10,
TimeUnit.MINUTES);
+ ControllerActivationTask task = new ControllerActivationTask();
+ scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task,
+ 10,
+ 10,
+ TimeUnit.SECONDS);
}
private static void createExecutors() {
@@ -186,4 +197,28 @@ public class RestConfCollector {
executor.execute(eventProcessor);
}
}
+
+ private static class ControllerActivationTask implements Runnable
+ {
+ public ControllerActivationTask() {
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ Iterator<String> it1 = controllerStore.keySet().iterator();
+ while(it1.hasNext()){
+ String key = it1.next();
+ AccessController ctlr = controllerStore.get(key);
+ if(ctlr.getState() == ControllerActivationState.INIT) {
+ log.info("Activating controller " + key);
+ ctlr.activate();
+ }
+ }
+ } catch (Exception e) {
+ log.info("Activation failed");
+ }
+ }
+ }
}
diff --git a/src/main/java/org/onap/dcae/common/ControllerActivationState.java b/src/main/java/org/onap/dcae/common/ControllerActivationState.java
new file mode 100644
index 0000000..deb2b1f
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/ControllerActivationState.java
@@ -0,0 +1,25 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+public enum ControllerActivationState {
+ INIT, ACTIVE, NONE;
+}
diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java
index 164aed3..cf44921 100644
--- a/src/main/java/org/onap/dcae/common/EventProcessor.java
+++ b/src/main/java/org/onap/dcae/common/EventProcessor.java
@@ -76,9 +76,9 @@ public class EventProcessor implements Runnable {
private void sendEventsToStreams(String[] streamIdList, EventData ev) {
for (String aStreamIdList : streamIdList) {
log.info("Invoking publisher for streamId:" + aStreamIdList);
- if (!ev.getConn().getEvent_ruleId().equals("")) {
+ if (!ev.getConn().getEventRuleId().equals("")) {
JSONObject customHeader = new JSONObject();
- customHeader.put("rule-id", ev.getConn().getEvent_ruleId());
+ customHeader.put("rule-id", ev.getConn().getEventRuleId());
eventPublisher.sendEvent(overrideEvent(customHeader, ev.getEventObj()), aStreamIdList);
} else {
eventPublisher.sendEvent(ev.getEventObj(), aStreamIdList);
diff --git a/src/main/java/org/onap/dcae/controller/AccessController.java b/src/main/java/org/onap/dcae/controller/AccessController.java
index 165c468..226ead5 100644
--- a/src/main/java/org/onap/dcae/controller/AccessController.java
+++ b/src/main/java/org/onap/dcae/controller/AccessController.java
@@ -24,6 +24,7 @@ import org.json.JSONObject;
import org.onap.dcae.ApplicationException;
import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.common.Constants;
+import org.onap.dcae.common.ControllerActivationState;
import org.onap.dcae.common.RestConfContext;
import org.onap.dcae.common.RestapiCallNode;
import org.slf4j.Logger;
@@ -56,6 +57,8 @@ public class AccessController {
private ExecutorService executor = Executors.newCachedThreadPool();
private Map<String, String> paraMap;
+ ControllerActivationState state;
+
public AccessController(JSONObject controller,
ApplicationSettings properties) {
this.cfgInfo = new ControllerConfigInfo(controller.get("controller_name").toString(),
@@ -71,7 +74,7 @@ public class AccessController {
this.ctx = new RestConfContext();
this.restApiCallNode = new RestapiCallNode();
this.paraMap = new HashMap<>();
-
+ this.state = ControllerActivationState.INIT;
prepareControllerParamMap();
log.info("AccesController Created {} {} {} {} {} {}",
@@ -85,12 +88,22 @@ public class AccessController {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof AccessController)) return false;
+ if (this == o)
+ return true;
+ if (!(o instanceof AccessController))
+ return false;
AccessController that = (AccessController) o;
return that.cfgInfo.getController_name().equals(that.cfgInfo.getController_name());
}
+ public ControllerActivationState getState() {
+ return state;
+ }
+
+ public void setState(ControllerActivationState state) {
+ this.state = state;
+ }
+
@Override
public int hashCode() {
return Objects.hash(this.cfgInfo.getController_name());
@@ -107,8 +120,7 @@ public class AccessController {
this.executor = executor;
}
- private void fetch_TokenId() {
-
+ private void fetchTokenId() {
modifyControllerParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(this.properties.authorizationEnabled()) + cfgInfo.getController_restapiUrl() + cfgInfo.getController_accessTokenUrl());
modifyControllerParamMap(Constants.KDEFAULT_TEMP_FILENAME, cfgInfo.getController_accessTokenFile());
@@ -117,6 +129,7 @@ public class AccessController {
modifyControllerParamMap(Constants.KSETTING_HTTP_METHOD, cfgInfo.getController_accessTokenMethod());
String httpResponse = null;
+
try {
getRestApiCallNode().sendRequest(this.paraMap, ctx, null);
@@ -125,43 +138,46 @@ public class AccessController {
log.info("httpResponse ", httpResponse + " key " + key);
JSONObject jsonObj = new JSONObject(httpResponse);
log.info("jsonObj ", jsonObj.toString());
- //JSONObject data = jsonObj.getJSONObject("data");
- //String tokenId = data.get("accessSession").toString();
//@TODO: Make return field dynamic
String tokenId = jsonObj.get("accessSession").toString();
- log.info("token 1" + tokenId);
modifyControllerParamMap(Constants.KSETTING_TOKENID, tokenId);
modifyControllerParamMap(Constants.KSETTING_CUSTOMHTTP_HEADER, "X-ACCESS-TOKEN=" + tokenId);
+ setState(ControllerActivationState.ACTIVE);
+
} catch (Exception e) {
- log.info("Access token is not supported" + e.getMessage());
+ log.info("Access token is not supported " + e.getMessage());
log.info("http response " + httpResponse);
}
}
+
public void activate() {
- fetch_TokenId();
- printControllerParamMap();
- /* Create eventlist from properties */
- JSONArray contollers = new JSONArray(properties.rccPolicy());
- for (int i = 0; i < contollers.length(); i++) {
- JSONObject controller = contollers.getJSONObject(i);
- if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) {
- JSONArray eventlists = controller.getJSONArray("event_details");
- for (int j = 0; j < eventlists.length(); j++) {
- JSONObject event = eventlists.getJSONObject(j);
- String name = event.get("event_name").toString();
- PersistentEventConnection conn = new PersistentEventConnection(name,
- event.get("event_description").toString(),
- Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()),
- event.get("event_sseventsField").toString(),
- event.get("event_sseventsUrl").toString(),
- event.get("event_subscriptionTemplate").toString(),
- event.get("event_unSubscriptionTemplate").toString(),
- event.get("event_ruleId").toString(),
- this);
-
- eventList.put(name, conn);
- executor.execute(conn);
+ fetchTokenId();
+
+ if (getState() == ControllerActivationState.ACTIVE) {
+ printControllerParamMap();
+ /* Create eventlist from properties */
+ JSONArray contollers = new JSONArray(properties.rccPolicy());
+ for (int i = 0; i < contollers.length(); i++) {
+ JSONObject controller = contollers.getJSONObject(i);
+ if (controller.get("controller_name").equals(this.getCfgInfo().getController_name())) {
+ JSONArray eventlists = controller.getJSONArray("event_details");
+ for (int j = 0; j < eventlists.length(); j++) {
+ JSONObject event = eventlists.getJSONObject(j);
+ String name = event.get("event_name").toString();
+ PersistentEventConnection conn = new PersistentEventConnection(name,
+ event.get("event_description").toString(),
+ Boolean.parseBoolean(event.get("event_sseventUrlEmbed").toString()),
+ event.get("event_sseventsField").toString(),
+ event.get("event_sseventsUrl").toString(),
+ event.get("event_subscriptionTemplate").toString(),
+ event.get("event_unSubscriptionTemplate").toString(),
+ event.get("event_ruleId").toString(),
+ this);
+
+ eventList.put(name, conn);
+ executor.execute(conn);
+ }
}
}
}
diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
index 99ad1af..047936d 100644
--- a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
+++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
@@ -185,7 +185,7 @@ public class PersistentEventConnection implements Runnable {
return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build();
}
- public String getEvent_ruleId() {
+ public String getEventRuleId() {
return event_ruleId;
}
diff --git a/src/test/java/org/onap/dcae/AccessControllerTest.java b/src/test/java/org/onap/dcae/AccessControllerTest.java
index b93e190..5469d89 100644
--- a/src/test/java/org/onap/dcae/AccessControllerTest.java
+++ b/src/test/java/org/onap/dcae/AccessControllerTest.java
@@ -106,6 +106,9 @@ public class AccessControllerTest {
JSONObject controller = new JSONObject("{\"controller_name\":\"AccessM&C\",\"controller_restapiUrl\":\"10.118.191.43:26335\",\"controller_restapiUser\":\"access\",\"controller_restapiPassword\":\"Huawei@123\",\"controller_accessTokenUrl\":\"/rest/plat/smapp/v1/oauth/token\",\"controller_accessTokenFile\":\"./etc/access-token.json\",\"controller_accessTokenMethod\":\"put\",\"controller_subsMethod\":\"post\",\"controller_subscriptionUrl\":\"/restconf/v1/operations/huawei-nce-notification-action:establish-subscription\",\"event_details\":[{\"event_name\":\"ONT_registration\",\"event_description\":\"ONTregistartionevent\",\"event_sseventUrlEmbed\":\"true\",\"event_sseventsField\":\"output.url\",\"event_sseventsUrl\":\"null\",\"event_subscriptionTemplate\":\"./etc/ont_registartion_subscription_template.json\",\"event_unSubscriptionTemplate\":\"./etc/ont_registartion_unsubscription_template.json\",\"event_ruleId\":\"777777777\"}]}");
AccessController acClr = new AccessController(controller,
properties);
+ AccessController acClr2 = new AccessController(controller,
+ properties);
+ acClr.equals(acClr2);
acClr.setRestApiCallNode(restApiCallNode);
acClr.setExecutor(executor);
acClr.getCtx().setAttribute("responsePrefix.httpResponse","{\"accessSession\" : \"12dsaf4-2323-1231131232323\"}");
diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
index bc78a24..3a9549f 100644
--- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
+++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
@@ -119,7 +119,7 @@ public class ApplicationSettingsTest {
int applicationPort = fromTemporaryConfiguration().httpPort();
// then
- assertEquals(8686, applicationPort);
+ assertEquals(8080, applicationPort);
}
@Test
@@ -135,7 +135,7 @@ public class ApplicationSettingsTest {
@Test
public void shouldReturnIfHTTPIsEnabled() throws IOException {
// when
- boolean httpsEnabled = fromTemporaryConfiguration("collector.service.port=8686").httpsEnabled();
+ boolean httpsEnabled = fromTemporaryConfiguration("collector.service.port=8080").httpsEnabled();
// then
assertTrue(httpsEnabled);
}
diff --git a/src/test/java/org/onap/dcae/common/EventProcessorTest.java b/src/test/java/org/onap/dcae/common/EventProcessorTest.java
index e23a3df..406a0fe 100644
--- a/src/test/java/org/onap/dcae/common/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/common/EventProcessorTest.java
@@ -92,6 +92,8 @@ public class EventProcessorTest {
PersistentEventConnection p = new PersistentEventConnection("","",true, "",
"","","","1234646346", acClr);
+ p.getEventParamMapValue("restapiUrl");
+ p.modifyEventParamMap("restapiUrl", "10.118.191.43:26335");
RestConfCollector.fProcessingInputQueue = new LinkedBlockingQueue<>(4);
RestConfCollector.fProcessingInputQueue.offer(new EventData(p, new JSONObject("{}")));
RestConfCollector.fProcessingInputQueue.offer(new EventData(null, null));
diff --git a/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java b/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java
index 5c8593e..b34d393 100644
--- a/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java
+++ b/src/test/java/org/onap/dcae/controller/ConfigCBSSourceTest.java
@@ -38,7 +38,7 @@ public class ConfigCBSSourceTest extends WiremockBasedTest {
@Test
public void shouldReturnValidAppConfiguration() {
// given
- String sampleConfigForVES = "{\"collector.rcc.service.port\": 8686}";
+ String sampleConfigForVES = "{\"collector.rcc.service.port\": 8080}";
stubConsulToReturnLocalAddressOfCBS();
stubCBSToReturnAppConfig(sampleConfigForVES);
diff --git a/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java b/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java
index 730a98d..bcba7fb 100644
--- a/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java
+++ b/src/test/java/org/onap/dcae/controller/ConfigLoaderIntegrationE2ETest.java
@@ -23,11 +23,7 @@ package org.onap.dcae.controller;
import static io.vavr.API.Map;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.onap.dcae.TestingUtilities.createTemporaryFile;
import static org.onap.dcae.TestingUtilities.readFile;
import static org.onap.dcae.TestingUtilities.readJSONFromFile;
@@ -51,24 +47,26 @@ public class ConfigLoaderIntegrationE2ETest extends WiremockBasedTest {
@Test
public void testSuccessfulE2EFlow() {
- // given
+// // given
// Path dMaaPConfigFile = createTemporaryFile("{}");
// Path collectorPropertiesFile = createTemporaryFile("");
// Path dMaaPConfigSource = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json");
// JSONObject dMaaPConf = readJSONFromFile(dMaaPConfigSource);
// stubConsulToReturnLocalAddressOfCBS();
-// stubCBSToReturnAppConfig(f("{\"collector.port\": 8686, \"streams_publishes\": %s}}", dMaaPConf));
+// stubCBSToReturnAppConfig(f("{\"collector.port\": 8080, \"streams_publishes\": %s}}", dMaaPConf));
//
// EventPublisher eventPublisherMock = mock(EventPublisher.class);
//
-// Mockito.mock(RestConfCollector.class);
+// RestConfCollector rs = Mockito.mock(RestConfCollector.class);
+//
+//
// ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(dMaaPConfigFile, collectorPropertiesFile);
// ConfigLoader configLoader = new ConfigLoader(eventPublisherMock::reconfigure, configFilesFacade, ConfigSource::getAppConfig, () -> wiremockBasedEnvProps());
// configLoader.updateConfig();
//
// // then
// assertThat(readJSONFromFile(dMaaPConfigSource).toString()).isEqualTo(dMaaPConf.toString());
-// assertThat(readFile(collectorPropertiesFile).trim()).isEqualTo("collector.port = 8686");
+// assertThat(readFile(collectorPropertiesFile).trim()).isEqualTo("collector.port = 8080");
// verify(eventPublisherMock, times(1)).reconfigure(
// DMaaPConfigurationParser.parseToDomainMapping(dMaaPConf).get()
// );
@@ -81,18 +79,24 @@ public class ConfigLoaderIntegrationE2ETest extends WiremockBasedTest {
Path collectorPropertiesFile = createTemporaryFile("");
JSONObject dMaaPConf = readJSONFromFile(Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"));
stubConsulToReturnLocalAddressOfCBS();
- stubCBSToReturnAppConfig(f("{\"collector.port\": 8686, \"streams_publishes\": %s}}", dMaaPConf));
+ stubCBSToReturnAppConfig(f("{\"collector.port\": 8080, \"streams_publishes\": %s}}", dMaaPConf));
EventPublisher eventPublisherMock = mock(EventPublisher.class);
ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(dMaaPConfigFile, collectorPropertiesFile);
- configFilesFacade.writeProperties(Map("collector.port", "8686"));
+ configFilesFacade.writeProperties(Map("collector.port", "8080"));
configFilesFacade.writeDMaaPConfiguration(dMaaPConf);
- // when
ConfigLoader configLoader = new ConfigLoader(eventPublisherMock::reconfigure, configFilesFacade, ConfigSource::getAppConfig, () -> wiremockBasedEnvProps());
configLoader.updateConfig();
-
// then
+
verifyZeroInteractions(eventPublisherMock);
+
+ // when
+ JSONObject dMaaPConf2 = readJSONFromFile(Paths.get("src/test/resources/testParseDMaaPCredentialsGen2Temp.json"));
+ configFilesFacade.writeDMaaPConfiguration(dMaaPConf2);
+ configFilesFacade.writeProperties(Map("collector.port", "8081"));
+ configLoader.updateConfig();
+
}
} \ No newline at end of file