diff options
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms')
17 files changed, 1434 insertions, 0 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/Application.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/Application.java new file mode 100644 index 00000000..f522e008 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/Application.java @@ -0,0 +1,139 @@ + +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.Map; + +import javax.sql.DataSource; + +import org.onap.slice.analysis.ms.beans.ConfigPolicy; +import org.onap.slice.analysis.ms.beans.Configuration; +import org.onap.slice.analysis.ms.controller.ConfigFetchFromCbs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.jdbc.DataSourceBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * + * Entry point for the slice analysis service application + * + */ +@EnableScheduling +@SpringBootApplication +public class Application { + + private static Logger log = LoggerFactory.getLogger(Application.class); + + /** + * Main method where initial configuration and context is set + * @param args + */ + public static void main(String[] args) { + getConfig(); + log.info("Starting spring boot application"); + SpringApplication.run(Application.class, args); + } + + private static void getConfig() { + + Boolean standalone = Boolean.parseBoolean(System.getenv("STANDALONE")); + + if (standalone) { + log.info("Running in standalone mode"); + + String configFile = System.getenv("CONFIG_FILE"); + String configAllJson = readFromFile(configFile); + + JsonObject configAll = new Gson().fromJson(configAllJson, JsonObject.class); + + JsonObject config = configAll.getAsJsonObject("config"); + + Configuration.getInstance().updateConfigurationFromJsonObject(config); + + ConfigPolicy configPolicy = ConfigPolicy.getInstance(); + Type mapType = new TypeToken<Map<String, Object>>() { + }.getType(); + if (configAll.getAsJsonObject("policies") != null) { + JsonObject policyJson = configAll.getAsJsonObject("policies").getAsJsonArray("items").get(0) + .getAsJsonObject().getAsJsonObject("config"); + Map<String, Object> policy = new Gson().fromJson(policyJson, mapType); + configPolicy.setConfig(policy); + log.info("Config policy {}", configPolicy); + } + return; + } + + ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60)); + Thread configFetchThread = new Thread(configFetchFromCbs); + configFetchThread.start(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + log.debug("InterruptedException : {}", e); + Thread.currentThread().interrupt(); + } + log.info("after 10s sleep"); + } + + /** + * DataSource bean. + */ + @Bean + public DataSource dataSource() { + Configuration configuration = Configuration.getInstance(); + + String url = "jdbc:postgresql://" + configuration.getPgHost() + ":" + configuration.getPgPort() + "/sliceanalysisms"; + + return DataSourceBuilder.create().url(url).username(configuration.getPgUsername()) + .password(configuration.getPgPassword()).build(); + } + + private static String readFromFile(String file) { + String content = ""; + try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) { + content = bufferedReader.readLine(); + String temp; + while ((temp = bufferedReader.readLine()) != null) { + content = content.concat(temp); + } + content = content.trim(); + } catch (Exception e) { + content = null; + } + return content; + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/ConfigPolicy.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/ConfigPolicy.java new file mode 100644 index 00000000..d4cd53b8 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/ConfigPolicy.java @@ -0,0 +1,72 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.beans; + +import java.util.Map; + +/** + * + * Model class for configuration policy + * + */ + +public class ConfigPolicy { + + private static ConfigPolicy instance = null; + private Map<String, Object> config; + + protected ConfigPolicy() { + + } + + /** + * Get instance of class. + */ + public static ConfigPolicy getInstance() { + if (instance == null) { + instance = new ConfigPolicy(); + } + return instance; + } + + /** + * Get config param of ConfigPolicy + */ + public Map<String, Object> getConfig() { + return config; + } + + /** + * set config param of ConfigPolicy + */ + public void setConfig(Map<String, Object> config) { + this.config = config; + } + + /** + * Return ConfigPolicy instance as String + */ + @Override + public String toString() { + return "ConfigPolicy [config=" + config + "]"; + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/Configuration.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/Configuration.java new file mode 100644 index 00000000..c7072343 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/beans/Configuration.java @@ -0,0 +1,242 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.beans; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Model class for the application Configuration + */ +public class Configuration { + private static Logger log = LoggerFactory.getLogger(Configuration.class); + + private static Configuration instance = null; + private String pgHost; + private int pgPort; + private String pgUsername; + private String pgPassword; + private List<String> dmaapServers; + private String configDbService; + private String cg; + private String cid; + private int pollingInterval; + private int pollingTimeout; + private String aafUsername; + private String aafPassword; + private Map<String, Object> streamsSubscribes; + private Map<String, Object> streamsPublishes; + + /** + * Check if topic is secure. + */ + public boolean isSecured() { + return (aafUsername != null); + + } + + public String getAafUsername() { + return aafUsername; + } + + public void setAafUsername(String aafUsername) { + this.aafUsername = aafUsername; + } + + public String getAafPassword() { + return aafPassword; + } + + public void setAafPassword(String aafPassword) { + this.aafPassword = aafPassword; + } + + public Map<String, Object> getStreamsSubscribes() { + return streamsSubscribes; + } + + public void setStreamsSubscribes(Map<String, Object> streamsSubscribes) { + this.streamsSubscribes = streamsSubscribes; + } + + public Map<String, Object> getStreamsPublishes() { + return streamsPublishes; + } + + public void setStreamsPublishes(Map<String, Object> streamsPublishes) { + this.streamsPublishes = streamsPublishes; + } + + protected Configuration() { + + } + + /** + * Get instance of class. + */ + public static Configuration getInstance() { + if (instance == null) { + instance = new Configuration(); + } + return instance; + } + + public String getCg() { + return cg; + } + + public void setCg(String cg) { + this.cg = cg; + } + + public String getCid() { + return cid; + } + + public void setCid(String cid) { + this.cid = cid; + } + + public int getPollingInterval() { + return pollingInterval; + } + + public void setPollingInterval(int pollingInterval) { + this.pollingInterval = pollingInterval; + } + + public int getPollingTimeout() { + return pollingTimeout; + } + + public void setPollingTimeout(int pollingTimeout) { + this.pollingTimeout = pollingTimeout; + } + + public String getPgHost() { + return pgHost; + } + + public void setPgHost(String pgHost) { + this.pgHost = pgHost; + } + + public int getPgPort() { + return pgPort; + } + + public void setPgPort(int pgPort) { + this.pgPort = pgPort; + } + + public String getPgUsername() { + return pgUsername; + } + + public void setPgUsername(String pgUsername) { + this.pgUsername = pgUsername; + } + + public String getPgPassword() { + return pgPassword; + } + + public void setPgPassword(String pgPassword) { + this.pgPassword = pgPassword; + } + + public List<String> getDmaapServers() { + return dmaapServers; + } + + public void setDmaapServers(List<String> dmaapServers) { + this.dmaapServers = dmaapServers; + } + + public String getConfigDbService() { + return configDbService; + } + + public void setConfigDbService(String configDbService) { + this.configDbService = configDbService; + } + + + + @Override + public String toString() { + return "Configuration [pgHost=" + pgHost + ", pgPort=" + pgPort + ", pgUsername=" + pgUsername + ", pgPassword=" + + pgPassword + ", dmaapServers=" + dmaapServers + ", configDbService=" + configDbService + ", cg=" + cg + + ", cid=" + cid + ", pollingInterval=" + pollingInterval + ", pollingTimeout=" + pollingTimeout + + ", aafUsername=" + aafUsername + ", aafPassword=" + aafPassword + ", streamsSubscribes=" + + streamsSubscribes + ", streamsPublishes=" + streamsPublishes + "]"; + } + + /** + * updates application configuration. + */ + public void updateConfigurationFromJsonObject(JsonObject jsonObject) { + + log.info("Updating configuration from CBS"); + + Type mapType = new TypeToken<Map<String, Object>>() { + }.getType(); + + JsonObject subscribes = jsonObject.getAsJsonObject("streams_subscribes"); + streamsSubscribes = new Gson().fromJson(subscribes, mapType); + + JsonObject publishes = jsonObject.getAsJsonObject("streams_publishes"); + streamsPublishes = new Gson().fromJson(publishes, mapType); + + pgPort = jsonObject.get("postgres.port").getAsInt(); + pollingInterval = jsonObject.get("sliceanalysisms.pollingInterval").getAsInt(); + pgPassword = jsonObject.get("postgres.password").getAsString(); + pgUsername = jsonObject.get("postgres.username").getAsString(); + pgHost = jsonObject.get("postgres.host").getAsString(); + + JsonArray servers = jsonObject.getAsJsonArray("sliceanalysisms.dmaap.server"); + Type listType = new TypeToken<List<String>>() { + }.getType(); + dmaapServers = new Gson().fromJson(servers, listType); + + cg = jsonObject.get("sliceanalysisms.cg").getAsString(); + cid = jsonObject.get("sliceanalysisms.cid").getAsString(); + configDbService = jsonObject.get("sliceanalysisms.configDb.service").getAsString(); + + pollingTimeout = jsonObject.get("sliceanalysisms.pollingInterval").getAsInt(); + + log.info("configuration from CBS {}", this); + + } + + + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/ConfigFetchFromCbs.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/ConfigFetchFromCbs.java new file mode 100644 index 00000000..e8e4e11b --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/ConfigFetchFromCbs.java @@ -0,0 +1,127 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.controller; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; + +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.Map; + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.slice.analysis.ms.beans.ConfigPolicy; +import org.onap.slice.analysis.ms.beans.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.Disposable; + +/** + * This class provides method to fetch application Configuration + * from CBS + */ +public class ConfigFetchFromCbs implements Runnable { + + private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class); + + private Duration interval; + + public ConfigFetchFromCbs() { + + } + + public ConfigFetchFromCbs(Duration interval) { + this.interval = interval; + } + + /** + * Gets app config from CBS. + */ + private Disposable getAppConfig() { + + // Generate RequestID and InvocationID which will be used when logging and in + // HTTP requests + log.info("getAppconfig start .."); + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + // Read necessary properties from the environment + final EnvProperties env = EnvProperties.fromEnvironment(); + log.debug("environments {}", env); + ConfigPolicy configPolicy = ConfigPolicy.getInstance(); + + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + + // Create the client and use it to get the configuration + final CbsRequest request = CbsRequests.getAll(diagnosticContext); + return CbsClientFactory.createCbsClient(env) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> { + log.info("configuration and policy from CBS {}", jsonObject); + JsonObject config = jsonObject.getAsJsonObject("config"); + Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt()); + if (!newPeriod.equals(period)) { + interval = newPeriod; + synchronized (this) { + this.notifyAll(); + } + + } + Configuration.getInstance().updateConfigurationFromJsonObject(config); + + Type mapType = new TypeToken<Map<String, Object>>() { + }.getType(); + if (jsonObject.getAsJsonObject("policies") != null) { + JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0) + .getAsJsonObject().getAsJsonObject("config"); + Map<String, Object> policy = new Gson().fromJson(policyJson, mapType); + configPolicy.setConfig(policy); + log.info("Config policy {}", configPolicy); + } + }, throwable -> log.warn("Ooops", throwable)); + } + + + + @Override + public void run() { + Boolean done = false; + while (!done) { + try { + Disposable disp = getAppConfig(); + synchronized (this) { + this.wait(); + } + log.info("Polling interval changed"); + disp.dispose(); + } catch (Exception e) { + done = true; + } + } + } + +}
\ No newline at end of file diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/HealthCheck.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/HealthCheck.java new file mode 100644 index 00000000..854bdf4f --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/controller/HealthCheck.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.controller; + +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +/** + * This Controller provides the slice-analysis-ms + * application's health + */ +@RestController +public class HealthCheck { + @RequestMapping(value = "/healthcheck", method = RequestMethod.GET) + public ResponseEntity<HttpStatus> healthCheck() { + return new ResponseEntity<>(HttpStatus.OK); + } +}
\ No newline at end of file diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/beans/PerformanceNotifications.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/beans/PerformanceNotifications.java new file mode 100644 index 00000000..2fa06d9f --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/beans/PerformanceNotifications.java @@ -0,0 +1,80 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.data.beans; + +import java.sql.Timestamp; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; + +import org.hibernate.annotations.CreationTimestamp; + +/** + * Entity for PERFORMANCE_NOTIFICATIONS table + */ +@Entity +@Table(name = "PERFORMANCE_NOTIFICATIONS") +public class PerformanceNotifications { + + @Id + @Column(name = "notification", columnDefinition = "text") + private String notification; + + @CreationTimestamp + @Column(name = "created_at", columnDefinition = "timestamp") + private Timestamp createdAt; + + /** + * default constructor + */ + public PerformanceNotifications() { + + } + + /** + * Constructs PerformanceNotifications instance + * @param notification + * @param createdAt + */ + public PerformanceNotifications(String notification, Timestamp createdAt) { + this.notification = notification; + this.createdAt = createdAt; + } + + public String getNotification() { + return notification; + } + + public void setNotification(String notification) { + this.notification = notification; + } + + public Timestamp getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(Timestamp createdAt) { + this.createdAt = createdAt; + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/repository/PerformanceNotificationsRepository.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/repository/PerformanceNotificationsRepository.java new file mode 100644 index 00000000..55338c9e --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/data/repository/PerformanceNotificationsRepository.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.data.repository; + +import org.onap.slice.analysis.ms.data.beans.PerformanceNotifications; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.stereotype.Repository; + +/** + * This interface provides methods to perform CRUD operations + * on performance_notifications table + */ +@Repository +public interface PerformanceNotificationsRepository extends CrudRepository<PerformanceNotifications, String> { + + /** + * read performanceNotifications from db + * @return + */ + @Query(nativeQuery = true, value = "DELETE FROM performance_notifications " + + "WHERE notification = ( SELECT notification FROM performance_notifications ORDER BY " + + "created_at FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING notification;") + public String getPerformanceNotificationFromQueue(); + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java new file mode 100644 index 00000000..6e0ea401 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java @@ -0,0 +1,112 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import com.att.nsa.cambria.client.CambriaConsumer; + +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.onap.slice.analysis.ms.beans.Configuration; +import org.onap.slice.analysis.ms.utils.DmaapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * This class initializes and starts the dmaap client + * to listen on application required dmaap events + */ +@Component +public class DmaapClient { + + private Configuration configuration; + private static Logger log = LoggerFactory.getLogger(DmaapClient.class); + + private DmaapUtils dmaapUtils; + + /** + * init dmaap client. + */ + @PostConstruct + public void initClient() { + log.debug("initializing client"); + dmaapUtils = new DmaapUtils(); + configuration = Configuration.getInstance(); + if (log.isDebugEnabled()) { + log.debug(configuration.toString()); + } + + startClient(); + } + + /** + * start dmaap client. + */ + @SuppressWarnings("unchecked") + public synchronized void startClient() { + + Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes(); + + String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); + String[] pmTopicSplit = pmTopicUrl.split("\\/"); + String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; + log.debug("pm topic : {}", pmTopic); + String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); + String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); + String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; + log.debug("policyResponse Topic : {}", policyResponseTopic); + CambriaConsumer pmNotifCambriaConsumer = null; + CambriaConsumer policyResponseCambriaConsumer = null; + + pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + + ScheduledExecutorService executorPool; + + // create notification consumers for PM + NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer, + new PmNotificationCallback()); + // start pm notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + // create notification consumers for Policy + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + new PolicyNotificationCallback()); + // start policy notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + + + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java new file mode 100644 index 00000000..5c1f496b --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java @@ -0,0 +1,65 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import javax.annotation.PostConstruct; + +import org.springframework.stereotype.Component; + +/** + * This class indicates whether new pm notification + * is set for the slice-analysis-ms + */ +@Component +public class NewPmNotification { + + private Boolean newNotif; + + /** + * Initialize new pm Notification flag + */ + @PostConstruct + public void init() { + newNotif = false; + } + + public Boolean getNewNotif() { + return newNotif; + } + + public void setNewNotif(Boolean newNotif) { + this.newNotif = newNotif; + } + + public NewPmNotification(Boolean newNotif) { + super(); + this.newNotif = newNotif; + } + + /** + * Default constructor + */ + public NewPmNotification() { + + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java new file mode 100644 index 00000000..427b4048 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java @@ -0,0 +1,28 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +public abstract class NotificationCallback { + + public abstract void activateCallBack(String msg); + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java new file mode 100644 index 00000000..b605264c --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import com.att.nsa.cambria.client.CambriaConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Consume Notifications from DMAAP events + */ +public class NotificationConsumer implements Runnable { + + private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); + private CambriaConsumer cambriaConsumer; + private NotificationCallback notificationCallback; + + /** + * Parameterized Constructor. + */ + public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { + super(); + this.cambriaConsumer = cambriaConsumer; + this.notificationCallback = notificationCallback; + } + + /** + * starts fetching msgs from dmaap events + */ + @Override + public void run() { + try { + Iterable<String> msgs = cambriaConsumer.fetch(); + for (String msg : msgs) { + log.debug(msg); + notificationCallback.activateCallBack(msg); + } + } catch (Exception e) { + log.debug("exception when fetching msgs from dmaap", e); + } + + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java new file mode 100644 index 00000000..03e1c238 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; + +import java.io.IOException; + +/** + * Produces Notification on DMAAP events + */ +public class NotificationProducer { + + private CambriaBatchingPublisher cambriaBatchingPublisher; + + + /** + * Parameterized constructor. + */ + public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) { + super(); + this.cambriaBatchingPublisher = cambriaBatchingPublisher; + } + + /** + * sends notification to dmaap. + */ + public int sendNotification(String msg) throws IOException { + + return cambriaBatchingPublisher.send("", msg); + + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java new file mode 100644 index 00000000..17e50aca --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java @@ -0,0 +1,58 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import org.onap.slice.analysis.ms.data.beans.PerformanceNotifications; +import org.onap.slice.analysis.ms.data.repository.PerformanceNotificationsRepository; +import org.onap.slice.analysis.ms.utils.BeanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles Notification on dmaap for Performance events + */ +public class PmNotificationCallback extends NotificationCallback { + + private static Logger log = LoggerFactory.getLogger(PmNotificationCallback.class); + + @Override + public void activateCallBack(String msg) { + handleNotification(msg); + } + + /** + * Parse Performance dmaap notification and save to DB + * @param msg + */ + private void handleNotification(String msg) { + + PerformanceNotificationsRepository performanceNotificationsRepository = BeanUtil + .getBean(PerformanceNotificationsRepository.class); + PerformanceNotifications performanceNotification = new PerformanceNotifications(); + performanceNotification.setNotification(msg); + log.info("Performance notification {}", performanceNotification); + NewPmNotification newNotification = BeanUtil.getBean(NewPmNotification.class); + performanceNotificationsRepository.save(performanceNotification); + newNotification.setNewNotif(true); + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java new file mode 100644 index 00000000..81ca9ef1 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java @@ -0,0 +1,68 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import java.io.IOException; +import java.util.Map; + +import org.onap.slice.analysis.ms.beans.Configuration; +import org.onap.slice.analysis.ms.utils.DmaapUtils; + +/** + * Client class to handle Policy interactions + */ +public class PolicyDmaapClient { + + private DmaapUtils dmaapUtils; + + private Configuration configuration; + + public PolicyDmaapClient(DmaapUtils dmaapUtils, Configuration configuration) { + this.dmaapUtils = dmaapUtils; + this.configuration = configuration; + } + + /** + * Method stub for sending notification to policy. + */ + @SuppressWarnings("unchecked") + public boolean sendNotificationToPolicy(String msg) { + Map<String, Object> streamsPublishes = configuration.getStreamsPublishes(); + String policyTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamsPublishes.get("CL_topic")) + .get("dmaap_info")).get("topic_url"); + String[] policyTopicSplit = policyTopicUrl.split("\\/"); + String policyTopic = policyTopicSplit[policyTopicSplit.length - 1]; + CambriaBatchingPublisher cambriaBatchingPublisher; + try { + + cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, policyTopic); + + NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher); + notificationProducer.sendNotification(msg); + } catch (IOException e) { + return false; + } + return true; + } + +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java new file mode 100644 index 00000000..57aadd18 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.dmaap; + +import org.slf4j.Logger; + +/** + * Handles Notification on dmaap for Policy events + */ +public class PolicyNotificationCallback extends NotificationCallback { + + private static final Logger log = org.slf4j.LoggerFactory.getLogger(PolicyNotificationCallback.class); + + /** + * Trigger on Notification from policy component + */ + @Override + public void activateCallBack(String msg) { + handlePolicyNotification(msg); + } + + /** + * Parse and take actions on reception of Notification from Policy + * @param msg + */ + private void handlePolicyNotification(String msg) { + log.info("Message received from policy: " +msg); + //TBD - actions to perform on reception of notification from policy + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/BeanUtil.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/BeanUtil.java new file mode 100644 index 00000000..5f4006c9 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/BeanUtil.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.utils; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Service; + +/** + * This class is used to get/set bean references + */ +@Service +public class BeanUtil implements ApplicationContextAware { + private static ApplicationContext context; + + /** + * set ApplicationContext + */ + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + context = applicationContext; + } + + /** + * Get bean class + * @param <T> + * @param beanClass + * @return + */ + public static <T> T getBean(Class<T> beanClass) { + return context.getBean(beanClass); + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java new file mode 100644 index 00000000..0952f754 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java @@ -0,0 +1,141 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2020 Wipro Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ + +package org.onap.slice.analysis.ms.utils; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClient; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.cambria.client.CambriaTopicManager; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; + +import org.onap.slice.analysis.ms.beans.Configuration; + +/** + * Utility class to perform actions related to Dmaap + */ +public class DmaapUtils { + + /** + * Build publisher. + */ + public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) { + try { + return builder(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + + } + } + + /** + * Build consumer. + */ + public CambriaConsumer buildConsumer(Configuration config, String topic) { + + try { + return builderConsumer(config, topic).build(); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + } + + } + + private static PublisherBuilder builder(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedBuilder(config, topic); + } else { + return unAuthenticatedBuilder(config, topic); + } + } + + private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) { + return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); + } + + private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .logSendFailuresAfter(5); + } + + private static ConsumerBuilder builderConsumer(Configuration config, String topic) { + if (config.isSecured()) { + return authenticatedConsumerBuilder(config, topic); + } else { + return unAuthenticatedConsumerBuilder(config, topic); + } + } + + private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) { + return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic) + .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000); + } + + private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) { + return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(), + config.getAafPassword()); + } + + /** + * Build cambriaClient. + */ + public CambriaTopicManager cambriaCLientBuilder(Configuration configuration) { + if (configuration.isSecured()) { + return authenticatedCambriaCLientBuilder(configuration); + } else { + return unAuthenticatedCambriaCLientBuilder(configuration); + + } + } + + private static CambriaTopicManager authenticatedCambriaCLientBuilder(Configuration config) { + try { + return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers()) + .authenticatedByHttp(config.getAafUsername(), config.getAafPassword())); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + } + } + + private static CambriaTopicManager unAuthenticatedCambriaCLientBuilder(Configuration config) { + try { + return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers())); + } catch (MalformedURLException | GeneralSecurityException e) { + return null; + + } + } + + @SuppressWarnings("unchecked") + private static <T extends CambriaClient> T buildCambriaClient( + CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) + throws MalformedURLException, GeneralSecurityException { + return (T) client.build(); + } + +} |