aboutsummaryrefslogtreecommitdiffstats
path: root/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc
diff options
context:
space:
mode:
Diffstat (limited to 'appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc')
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/AppcOam.java582
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/OAMCommandStatus.java87
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/Converter.java133
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/DmaapOutgoingMessage.java134
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java131
-rw-r--r--appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/OAMContext.java72
6 files changed, 1139 insertions, 0 deletions
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/AppcOam.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/AppcOam.java
new file mode 100644
index 000000000..d255f5072
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/AppcOam.java
@@ -0,0 +1,582 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam;
+
+import org.openecomp.appc.Constants;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.appc.executor.objects.Params;
+import org.openecomp.appc.i18n.Msg;
+import org.openecomp.appc.logging.LoggingConstants;
+import org.openecomp.appc.logging.LoggingUtils;
+import org.openecomp.appc.metricservice.MetricRegistry;
+import org.openecomp.appc.metricservice.MetricService;
+import org.openecomp.appc.metricservice.metric.Metric;
+import org.openecomp.appc.requesthandler.LCMStateManager;
+import org.openecomp.appc.requesthandler.RequestHandler;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.eelf.i18n.EELFResourceManager;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.*;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeader;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.get.metrics.output.Metrics;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.get.metrics.output.MetricsBuilder;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.get.metrics.output.metrics.KpiValues;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.get.metrics.output.metrics.KpiValuesBuilder;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.status.Status;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.status.StatusBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.MDC;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.openecomp.appc.oam.messageadapter.*;
+
+
+import static com.att.eelf.configuration.Configuration.*;
+
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+
+public class AppcOam implements AutoCloseable, AppcOamService {
+
+ private Configuration configuration = ConfigurationFactory.getConfiguration();
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(AppcOam.class);
+
+ private boolean isMetricEnabled = false;
+
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private volatile ScheduledFuture<?> outstandingLCMRequestMonitorSheduledFuture;
+
+
+ private MessageAdapter messageAdapter;
+
+
+ /**
+ * The ODL data store broker. Provides access to a conceptual data tree store and also provides the ability to
+ * subscribe for changes to data under a given branch of the tree.
+ */
+ private DataBroker dataBroker;
+
+ /**
+ * ODL Notification Service that provides publish/subscribe capabilities for YANG modeled notifications.
+ */
+ private NotificationProviderService notificationService;
+
+ /**
+ * Provides a registry for Remote Procedure Call (RPC) service implementations. The RPCs are defined in YANG models.
+ */
+ private RpcProviderRegistry rpcRegistry;
+
+ /**
+ * Represents our RPC implementation registration
+ */
+ private BindingAwareBroker.RpcRegistration<AppcOamService> rpcRegistration;
+
+
+ /**
+ * The yang rpc names
+ */
+ public enum RPC {
+ start,
+ stop,
+ ;
+ }
+
+
+ /**
+ * @param dataBroker
+ * @param notificationProviderService
+ * @param rpcProviderRegistry
+ */
+ @SuppressWarnings({
+ "javadoc", "nls"
+ })
+ public AppcOam(DataBroker dataBroker, NotificationProviderService notificationProviderService,
+ RpcProviderRegistry rpcProviderRegistry) {
+
+ String appName = configuration.getProperty(Constants.PROPERTY_APPLICATION_NAME);
+ logger.info(Msg.COMPONENT_INITIALIZING, appName, "oam");
+
+ this.dataBroker = dataBroker;
+ this.notificationService = notificationProviderService;
+ this.rpcRegistry = rpcProviderRegistry;
+
+ if (this.rpcRegistry != null) {
+ rpcRegistration = rpcRegistry.addRpcImplementation(AppcOamService.class, this);
+ }
+
+ Properties properties = configuration.getProperties();
+ if (properties != null && properties.getProperty("metric.enabled") != null) {
+ isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
+ }
+
+
+ messageAdapter = new MessageAdapter();
+ messageAdapter.init();
+
+
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory(){
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+ return new Thread(runnable,bundle.getSymbolicName() + " scheduledExecutor");
+ }
+ }
+ );
+
+ logger.info(Msg.COMPONENT_INITIALIZED, appName, "oam");
+ }
+
+ /**
+ * Implements the close of the service
+ *
+ * @see AutoCloseable#close()
+ */
+ @SuppressWarnings("nls")
+ @Override
+ public void close() throws Exception {
+ String appName = configuration.getProperty(Constants.PROPERTY_APPLICATION_NAME);
+ logger.info(Msg.COMPONENT_TERMINATING, appName, "oam");
+ scheduledExecutorService.shutdown();
+ if (rpcRegistration != null) {
+ rpcRegistration.close();
+ }
+ logger.info(Msg.COMPONENT_TERMINATED, appName, "oam");
+ }
+
+ @Override
+ public Future<RpcResult<GetMetricsOutput>> getMetrics() {
+
+ GetMetricsOutputBuilder outputBuilder = new GetMetricsOutputBuilder();
+
+ if (!isMetricEnabled){
+ logger.error("Metric Service not enabled returning failure");
+ RpcResult<GetMetricsOutput> result = RpcResultBuilder.<GetMetricsOutput> status(false).withError(RpcError.ErrorType.APPLICATION,"Metric Service not enabled").build();
+ return Futures.immediateFuture(result);
+ }
+
+ MetricService metricService = null;
+ try {
+ metricService = getService(MetricService.class);
+ } catch (APPCException e){
+ logger.error("MetricService not found",e);
+ RpcResult<GetMetricsOutput> result = RpcResultBuilder.<GetMetricsOutput> status(false).withError(RpcError.ErrorType.APPLICATION,"Metric Service not found").build();
+ return Futures.immediateFuture(result);
+ }
+ Map<String,MetricRegistry> allMetricRegitry = metricService.getAllRegistry();
+
+ if(allMetricRegitry == null || allMetricRegitry.isEmpty()){
+ logger.error("No metrics registered returning failure");
+ RpcResult<GetMetricsOutput> result = RpcResultBuilder.<GetMetricsOutput> status(false).withError(RpcError.ErrorType.APPLICATION,"No metrics Registered").build();
+ return Futures.immediateFuture(result);
+ }
+ List<Metrics> metricsList = new ArrayList<>();
+
+ logger.debug("Iterating metric registry list");
+ for (MetricRegistry metricRegistry : allMetricRegitry.values() ) {
+ logger.debug("Iterating metric registry :" + metricRegistry.toString());
+ Metric[] metrics = metricRegistry.metrics() ;
+ if(metrics!= null && metrics.length >0) {
+ logger.debug("Iterating though metrics in registry");
+ for (Metric metric : metrics) {
+ logger.debug("Iterating though metrics: "+ metric.name());
+ MetricsBuilder metricsBuilder = new MetricsBuilder();
+ metricsBuilder.setKpiName(metric.name());
+ metricsBuilder.setLastResetTime(metric.getLastModified());
+ List<KpiValues> kpiList = new ArrayList<>();
+ Map<String, String> metricsOutput = metric.getMetricsOutput();
+ for (Map.Entry<String, String> kpi : metricsOutput.entrySet()) {
+ KpiValuesBuilder kpiValuesBuilder = new KpiValuesBuilder();
+ kpiValuesBuilder.setName(kpi.getKey());
+ kpiValuesBuilder.setValue(kpi.getValue());
+ kpiList.add(kpiValuesBuilder.build());
+ }
+ metricsBuilder.setKpiValues(kpiList);
+ metricsList.add(metricsBuilder.build());
+ }
+ }
+ }
+ outputBuilder.setMetrics(metricsList);
+ RpcResult<GetMetricsOutput> result = RpcResultBuilder.<GetMetricsOutput> status(true).withResult(outputBuilder.build()).build();
+ return Futures.immediateFuture(result);
+ }
+
+ @Override
+ public Future<RpcResult<StopOutput>> stop(StopInput stopInput){
+ logger.debug("Input received : " + stopInput);
+ final Date startTime = new Date();
+ Status status = this.buildStatus(OAMCommandStatus.ACCEPTED);
+ final CommonHeader commonHeader = stopInput.getCommonHeader();
+
+ try {
+ setInitialLogProperties(commonHeader,RPC.stop);
+
+ //Close the gate so that no more new LCM request will be excepted.
+ LCMStateManager lcmStateManager = getService(LCMStateManager.class);
+ lcmStateManager.disableLCMOperations();
+ //Begin monitoring outstanding LCM request
+ scheduleOutstandingLCMRequestMonitor(commonHeader,startTime);
+ } catch(Throwable t) {
+ status = unexpectedOAMError(t,RPC.stop);
+ }
+ finally {
+ LoggingUtils.auditWarn(startTime.toInstant(),
+ new Date(System.currentTimeMillis()).toInstant(),
+ String.valueOf(status.getCode()),
+ status.getMessage(),
+ this.getClass().getCanonicalName(),
+ Msg.OAM_OPERATION_STOPPING,
+ getAppcName()
+ );
+ this.clearRequestLogProperties();
+ }
+
+ StopOutputBuilder stopOutputBuilder = new StopOutputBuilder();
+ stopOutputBuilder.setStatus(status);
+ stopOutputBuilder.setCommonHeader(commonHeader);
+ StopOutput stopOutput = stopOutputBuilder.build();
+ return RpcResultBuilder.success(stopOutput).buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<StartOutput>> start(StartInput startInput){
+ logger.debug("Input received : " + startInput);
+ final Date startTime = new Date();
+ Status status = this.buildStatus(OAMCommandStatus.ACCEPTED);
+ final CommonHeader commonHeader = startInput.getCommonHeader();
+
+ try {
+
+
+ setInitialLogProperties(commonHeader,RPC.start);
+
+ this.scheduleStartingAPPC(commonHeader,startTime);
+ } catch(Throwable t) {
+ status = unexpectedOAMError(t,RPC.start);
+ }
+ finally {
+ LoggingUtils.auditWarn(startTime.toInstant(),
+ new Date(System.currentTimeMillis()).toInstant(),
+ String.valueOf(status.getCode()),
+ status.getMessage(),
+ this.getClass().getCanonicalName(),
+ Msg.OAM_OPERATION_STARTING,
+ getAppcName()
+ );
+ this.clearRequestLogProperties();
+ }
+
+ StartOutputBuilder startOutputBuilder = new StartOutputBuilder();
+ startOutputBuilder.setStatus(status);
+ startOutputBuilder.setCommonHeader(commonHeader);
+ StartOutput startOutput = startOutputBuilder.build();
+ return RpcResultBuilder.success(startOutput).buildFuture();
+ }
+
+ private <T> T getService(Class<T> _class) throws APPCException {
+ BundleContext bctx = FrameworkUtil.getBundle(_class).getBundleContext();
+ ServiceReference sref = bctx.getServiceReference(_class.getName());
+ if (sref != null) {
+ if(logger.isTraceEnabled()) {
+ logger.debug("Using the BundleContext to fetched the service reference for " + _class.getName());
+
+ }
+ return (T) bctx.getService(sref);
+ } else {
+ throw new APPCException("Using the BundleContext failed to to fetch service reference for " + _class.getName());
+ }
+ }
+
+ private Status buildStatus(OAMCommandStatus osmCommandStatus){
+ StatusBuilder status = new StatusBuilder();
+ status.setCode(osmCommandStatus.getResponseCode());
+ status.setMessage(osmCommandStatus.getResponseMessage());
+ return status.build();
+ }
+
+ private Status buildStatus(OAMCommandStatus osmCommandStatus,Params params){
+ StatusBuilder status = new StatusBuilder();
+ status.setCode(osmCommandStatus.getResponseCode());
+ status.setMessage(osmCommandStatus.getFormattedMessage(params));
+ return status.build();
+ }
+
+
+
+ private void clearRequestLogProperties() {
+ try {
+ MDC.remove(MDC_KEY_REQUEST_ID);
+ MDC.remove(MDC_SERVICE_INSTANCE_ID);
+ MDC.remove(MDC_SERVICE_NAME);
+ MDC.remove(LoggingConstants.MDCKeys.PARTNER_NAME);
+ MDC.remove(LoggingConstants.MDCKeys.TARGET_VIRTUAL_ENTITY);
+ } catch (Exception e) {
+
+ }
+ }
+
+ private void setInitialLogProperties(CommonHeader commonHeader,RPC action) {
+
+ try {
+ MDC.put(MDC_KEY_REQUEST_ID, commonHeader.getRequestId());
+ MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, commonHeader.getOriginatorId());
+ MDC.put(MDC_INSTANCE_UUID, ""); // value should be created in the future
+ try {
+ MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getCanonicalHostName()); //Don't change it to a .getHostName() again please. It's wrong!
+ MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress());
+ MDC.put(LoggingConstants.MDCKeys.SERVER_NAME, InetAddress.getLocalHost().getHostName());
+ MDC.put(MDC_SERVICE_NAME, action.name());
+ } catch (Exception e) {
+ logger.debug("MDC constant error",e);
+ }
+ } catch (RuntimeException e) {
+ //ignore
+ }
+ }
+
+
+ private void storeErrorMessageToLog(Status status, String additionalMessage) {
+ LoggingUtils.logErrorMessage(
+ String.valueOf(status.getCode()),
+ status.getMessage(),
+ LoggingConstants.TargetNames.APPC,
+ LoggingConstants.TargetNames.APPC_OAM_PROVIDER,
+ additionalMessage,
+ this.getClass().getCanonicalName());
+ }
+
+ private String getAppcName(){
+ return configuration.getProperty(Constants.PROPERTY_APPLICATION_NAME);
+ }
+
+ private Status unexpectedOAMError(Throwable t,RPC action){
+ final String appName = getAppcName();
+
+ String exceptionMessage = t.getMessage() != null ? t.getMessage() : t.toString();
+
+ String errorMessage = EELFResourceManager.format(Msg.OAM_OPERATION_EXCEPTION, t, appName, t.getClass().getSimpleName(), action.name(), exceptionMessage);
+
+ Params params = new Params().addParam("errorMsg", exceptionMessage);
+ Status status = buildStatus(
+ OAMCommandStatus.UNEXPECTED_ERROR,
+ params
+ );
+
+ storeErrorMessageToLog(status,errorMessage);
+ return status;
+ }
+
+
+ private int getInprogressLCMRequestCount() throws APPCException {
+ RequestHandler requestHandler = getService(RequestHandler.class);
+
+ if(requestHandler == null) {
+ return 0;
+ }
+
+ int inprogressRequestCount = requestHandler.getInprogressRequestCount();
+ return inprogressRequestCount;
+ }
+
+
+
+ private void scheduleOutstandingLCMRequestMonitor(final CommonHeader commonHeader,final Date startTime){
+
+
+ class MyCommand implements Runnable{
+
+ public ScheduledFuture<?> myScheduledFuture = null;
+
+ @Override
+ public void run() {
+ try {
+ setInitialLogProperties(commonHeader, RPC.stop);
+
+
+ logDebug("Executing stopping task ");
+
+ ScheduledFuture<?> currentScheduledFuture = AppcOam.this.outstandingLCMRequestMonitorSheduledFuture;
+
+ //cancel myself if I am not the current outstandingLCMRequestMonitor
+ if(currentScheduledFuture != myScheduledFuture){
+ myScheduledFuture.cancel(false);
+ return;
+ }
+
+ Status status = buildStatus(OAMCommandStatus.SUCCESS);
+
+
+ try {
+
+ //log status and return if there are still LCM request in progress
+ int inprogressRequestCount = getInprogressLCMRequestCount();
+ if (inprogressRequestCount > 0) {
+ logDebug("The application '%s' has '%s' outstanding LCM request to complete before coming to a complete stop. ",
+ getAppcName(),
+ inprogressRequestCount
+ );
+ return;
+ }
+
+ } catch (Throwable t) {
+ status = unexpectedOAMError(t, RPC.stop);
+ myScheduledFuture.cancel(false);
+ }
+
+ try {
+ OAMContext oamContext = new OAMContext();
+ oamContext.setRpcName(RPC.stop);
+ oamContext.setCommonHeader(commonHeader);
+ oamContext.setStatus(status);
+ messageAdapter.post(oamContext);
+ } catch(Throwable t) {
+ status = unexpectedOAMError(t,RPC.stop);
+ }
+
+ LoggingUtils.auditWarn(startTime.toInstant(),
+ new Date(System.currentTimeMillis()).toInstant(),
+ String.valueOf(status.getCode()),
+ status.getMessage(),
+ this.getClass().getCanonicalName(),
+ Msg.OAM_OPERATION_STOPPED,
+ getAppcName()
+ );
+ myScheduledFuture.cancel(false);
+
+ } finally {
+ clearRequestLogProperties();
+ }
+ }
+ };
+
+ MyCommand command = new MyCommand();
+
+ long initialDelay = 10000;
+ long delay = initialDelay;
+
+
+ command.myScheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(
+ command,
+ initialDelay,
+ delay,
+ TimeUnit.MILLISECONDS
+ );
+ this.outstandingLCMRequestMonitorSheduledFuture = command.myScheduledFuture;
+ }
+
+
+
+
+ private void scheduleStartingAPPC(final CommonHeader commonHeader,final Date startTime){
+
+
+ class MyCommand implements Runnable{
+
+
+ @Override
+ public void run() {
+ try {
+ setInitialLogProperties(commonHeader, RPC.start);
+
+ logDebug("Executing starting task ");
+
+ Status status = buildStatus(OAMCommandStatus.SUCCESS);
+
+ try {
+ LCMStateManager lcmStateManager = getService(LCMStateManager.class);
+ lcmStateManager.enableLCMOperations();
+ //cancel the current outstandingLCMRequestMonitor
+ outstandingLCMRequestMonitorSheduledFuture = null;
+ } catch(Throwable t) {
+ status = unexpectedOAMError(t,RPC.start);
+ }
+
+ try {
+ OAMContext oamContext = new OAMContext();
+ oamContext.setRpcName(RPC.start);
+ oamContext.setCommonHeader(commonHeader);
+ oamContext.setStatus(status);
+ messageAdapter.post(oamContext);
+ } catch(Throwable t) {
+ status = unexpectedOAMError(t,RPC.start);
+ }
+
+ LoggingUtils.auditWarn(startTime.toInstant(),
+ new Date(System.currentTimeMillis()).toInstant(),
+ String.valueOf(status.getCode()),
+ status.getMessage(),
+ this.getClass().getCanonicalName(),
+ Msg.OAM_OPERATION_STARTED,
+ getAppcName()
+ );
+ } finally {
+ clearRequestLogProperties();
+ }
+ }
+ };
+
+ MyCommand command = new MyCommand();
+ long initialDelay = 1000;
+
+ scheduledExecutorService.schedule(
+ command,
+ initialDelay,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+
+ private void logDebug(String message,Object... args){
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format(message,args));
+ }
+ }
+}
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/OAMCommandStatus.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/OAMCommandStatus.java
new file mode 100644
index 000000000..aebfaf113
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/OAMCommandStatus.java
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam;
+
+
+import org.openecomp.appc.executor.objects.Params;
+import org.openecomp.appc.util.MessageFormatter;
+
+import java.util.Map;
+
+public enum OAMCommandStatus {
+
+ ACCEPTED(100,"ACCEPTED - request accepted"),
+
+ //ERROR(2xx) – request can’t be handled due to some technical error
+ UNEXPECTED_ERROR(200,"UNEXPECTED ERROR - ${errorMsg}"),
+
+ SUCCESS(400,"SUCCESS - request has been processed successfully"),
+ ;
+
+
+ public static final String errorDgMessageParamName = "errorDgMessage";
+
+ private int responseCode;
+ private String responseMessage;
+
+
+
+
+ OAMCommandStatus(int responseCode, String responseMessage) {
+ this.responseCode = responseCode;
+ this.responseMessage = responseMessage;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+
+ /**
+ *
+ * @return messageTemplate
+ */
+
+
+ public String getFormattedMessage(Params params){
+ Map<String,Object> paramsMap = params != null ? params.getParams() : null;
+ return MessageFormatter.format(getResponseMessage(),paramsMap);
+
+ }
+
+ public String getFormattedMessageWithCode(Params params){
+ return getResponseCode()+"-" + getFormattedMessage(params);
+ }
+
+ @Override
+ public String toString() {
+ return "OAMCommandStatus{" +
+ "responseCode=" + responseCode +
+ ", responseMessage='" + responseMessage + '\'' +
+ '}';
+ }
+}
+
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/Converter.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/Converter.java
new file mode 100644
index 000000000..4895e23fa
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/Converter.java
@@ -0,0 +1,133 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam.messageadapter;
+
+import org.openecomp.appc.oam.AppcOam;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.*;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeader;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.status.Status;
+import org.opendaylight.yangtools.concepts.Builder;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+
+
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+public class Converter {
+ private static final String ISO_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final SimpleDateFormat isoFormatter = new SimpleDateFormat(ISO_FORMAT);
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger(Converter.class);
+ static {
+ isoFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+
+ private static Builder<?> convAsyncResponseToBuilder1(AppcOam.RPC rpcName, CommonHeader commonHeader, Status status) {
+ Builder<?> outObj = null;
+ if(rpcName == null){
+ throw new IllegalArgumentException("empty asyncResponse.rpcName");
+ }
+ if(commonHeader == null){
+ throw new IllegalArgumentException("empty asyncResponse.commonHeader");
+ }
+ if(status == null){
+ throw new IllegalArgumentException("empty asyncResponse.status");
+ }
+ switch (rpcName){
+ case stop:
+ outObj = new StopOutputBuilder();
+ ((StopOutputBuilder)outObj).setCommonHeader(commonHeader);
+ ((StopOutputBuilder)outObj).setStatus(status);
+ return outObj;
+
+ case start:
+ outObj = new StartOutputBuilder();
+ ((StartOutputBuilder)outObj).setCommonHeader(commonHeader);
+ ((StartOutputBuilder)outObj).setStatus(status);
+ return outObj;
+ default:
+ throw new IllegalArgumentException(rpcName+" action is not supported");
+ }
+ }
+
+ public static String convAsyncResponseToUebOutgoingMessageJsonString(OAMContext oamContext) throws JsonProcessingException {
+ AppcOam.RPC rpcName = oamContext.getRpcName();
+ CommonHeader commonHeader = oamContext.getCommonHeader();
+ Status status = oamContext.getStatus();
+
+ DmaapOutgoingMessage dmaapOutgoingMessage = convAsyncResponseToUebOutgoingMessage(rpcName,commonHeader,status);
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.addMixInAnnotations(dmaapOutgoingMessage.getBody().getOutput().getClass(), MixInFlagsMessage.class);
+ objectMapper.addMixInAnnotations(Status.class, MixIn.class);
+ objectMapper.addMixInAnnotations(CommonHeader.class, MixInCommonHeader.class);
+ ObjectWriter writer = objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL).configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY,true).writer();
+ return writer.writeValueAsString(dmaapOutgoingMessage);
+ }
+
+ private static DmaapOutgoingMessage convAsyncResponseToUebOutgoingMessage(AppcOam.RPC rpcName, CommonHeader commonHeader, Status status) throws JsonProcessingException {
+ DmaapOutgoingMessage outObj = new DmaapOutgoingMessage();
+ String correlationID = commonHeader.getRequestId();
+ outObj.setCorrelationID(correlationID);
+ outObj.setType("response");
+ outObj.setRpcName(rpcName.name());
+ Builder<?> builder = Converter.convAsyncResponseToBuilder1(rpcName,commonHeader,status);
+ Object messageBody = builder.build();
+
+ DmaapOutgoingMessage.Body body = new DmaapOutgoingMessage.Body(messageBody);
+ outObj.setBody(body);
+ return outObj;
+ }
+
+
+ abstract class MixIn {
+ @JsonIgnore
+ abstract Class<? extends DataContainer> getImplementedInterface(); // to be removed during serialization
+
+ @JsonValue
+ abstract java.lang.String getValue();
+ }
+ abstract class MixInCommonHeader extends MixIn {
+ @JsonProperty("request-id")
+ abstract java.lang.String getRequestId();
+
+ @JsonProperty("originator-id")
+ abstract java.lang.String getOriginatorId();
+
+ }
+ abstract class MixInFlagsMessage extends MixIn {
+ @JsonProperty("common-header")
+ abstract CommonHeader getCommonHeader();
+ }
+
+
+
+}
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/DmaapOutgoingMessage.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/DmaapOutgoingMessage.java
new file mode 100644
index 000000000..351984d13
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/DmaapOutgoingMessage.java
@@ -0,0 +1,134 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam.messageadapter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * This class represents a message being sent out to DMaaP by APPC as async response.
+ * note the structure of this class must be adapted to the sync message sent to DMaaP represented in org.openecomp.appc.listener.LCM.domainmodel.DmaapOutgoingMessage
+ *
+ */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DmaapOutgoingMessage {
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("correlation-id")
+ private String correlationID;
+
+ private final static String defaultCambriaPartition = "MSO";
+ @JsonProperty("cambria.partition")
+ private String cambriaPartition = defaultCambriaPartition;
+
+ @JsonProperty("rpc-name")
+ private String rpcName;
+
+ @JsonProperty("body")
+ private Body body;
+
+ public DmaapOutgoingMessage() {
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getCorrelationID() {
+ return correlationID;
+ }
+
+ public void setCorrelationID(String correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ public String getCambriaPartition() {
+ return cambriaPartition;
+ }
+
+ public void setCambriaPartition(String cambriaPartition) {
+ this.cambriaPartition = cambriaPartition;
+ }
+
+ public String getRpcName() {
+ return rpcName;
+ }
+
+ public void setRpcName(String rpcName) {
+ this.rpcName = rpcName;
+ }
+
+ public Body getBody() {
+ return body;
+ }
+
+ public void setBody(Body body) {
+ this.body = body;
+ }
+
+ @Override
+ public String toString() {
+ return "DmaapOutgoingMessage{" +
+ "cambriaPartition='" + cambriaPartition + '\'' +
+ ", rpcName='" + rpcName + '\'' +
+ ", body=" + body +
+ '}';
+ }
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class Body {
+ public Body() {
+ }
+
+ public Body(Object output) {
+ this.output = output;
+ }
+
+ @JsonProperty("output")
+ private Object output;
+
+ public Object getOutput() {
+ return output;
+ }
+
+ public void setOutput(Object body) {
+ this.output = body;
+ }
+
+ @Override
+ public String toString() {
+ return "Body{" +
+ "output=" + output +
+ '}';
+ }
+ }
+}
+
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java
new file mode 100644
index 000000000..2ba76d46e
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/MessageAdapter.java
@@ -0,0 +1,131 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam.messageadapter;
+
+import org.openecomp.appc.adapter.message.MessageAdapterFactory;
+import org.openecomp.appc.adapter.message.Producer;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.listener.impl.EventHandlerImpl;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.ObjectUtils;
+
+import java.util.HashSet;
+import java.util.Properties;
+
+public class MessageAdapter {
+
+ private Producer producer;
+ private String partition ;
+ private Configuration configuration;
+ private HashSet<String> pool;
+ private String writeTopic;
+ private String apiKey;
+ private String apiSecret;
+
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapter.class);
+
+ /**
+ * Initialize producer client to post messages using configuration properties
+ */
+ public void init(){
+ this.producer = getProducer();
+ }
+
+ private Producer getProducer() {
+ configuration = ConfigurationFactory.getConfiguration();
+ Properties properties=configuration.getProperties();
+ updateProperties(properties);
+ Producer localProducer = null;
+
+ BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
+ if (ctx != null) {
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ localProducer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic, apiKey, apiSecret);
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ localProducer.useHttps(true);
+ break;
+ }
+ }
+ }
+ }
+
+ return localProducer;
+ }
+
+ private void updateProperties(Properties props) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Entering to updateProperties with Properties = "+ ObjectUtils.toString(props));
+ }
+ pool = new HashSet<>();
+ if (props != null) {
+ writeTopic = props.getProperty("appc.OAM.topic.write");
+ apiKey = props.getProperty("appc.OAM.client.key");
+ apiSecret = props.getProperty("appc.OAM.client.secret");
+ String hostnames = props.getProperty("appc.OAM.poolMembers");
+ if (hostnames != null && !hostnames.isEmpty()) {
+ for (String name : hostnames.split(",")) {
+ pool.add(name);
+ }
+ }
+ }
+ }
+
+ /**
+ * Posts message to UEB. As UEB accepts only json messages this method first convert uebMessage to json format and post it to UEB.
+ * @param oamContext response data that based on it a message will be send to UEB (the format of the message that will be sent to UEB based on the action and its YANG domainmodel).
+ * @return True if message is postes successfully else False
+ */
+ public boolean post(OAMContext oamContext){
+ boolean success;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Entering to post with AsyncResponse = " + ObjectUtils.toString(oamContext));
+ }
+
+ String jsonMessage;
+ try {
+ jsonMessage = Converter.convAsyncResponseToUebOutgoingMessageJsonString(oamContext);
+ if (logger.isDebugEnabled()) {
+ logger.debug("UEB Response = " + jsonMessage);
+ }
+ success = producer.post(this.partition, jsonMessage);
+ } catch (JsonProcessingException e1) {
+ logger.error("Error generating Jason from UEB message "+ e1.getMessage());
+ success= false;
+ }catch (Exception e){
+ logger.error("Error sending message to UEB "+e.getMessage());
+ success= false;
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Exiting from post with (success = "+ ObjectUtils.toString(success)+")");
+ }
+ return success;
+ }
+}
diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/OAMContext.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/OAMContext.java
new file mode 100644
index 000000000..68ea95ba7
--- /dev/null
+++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/messageadapter/OAMContext.java
@@ -0,0 +1,72 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.oam.messageadapter;
+
+
+
+
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.*;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeader;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeaderBuilder;
+import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.status.Status;import org.openecomp.appc.oam.AppcOam;
+
+public class OAMContext {
+
+ private AppcOam.RPC rpcName;
+ private CommonHeader commonHeader;
+ private Status status;
+
+
+ public AppcOam.RPC getRpcName() {
+ return rpcName;
+ }
+
+ public void setRpcName(AppcOam.RPC rpcName) {
+ this.rpcName = rpcName;
+ }
+
+ public CommonHeader getCommonHeader() {
+ return commonHeader;
+ }
+
+ public void setCommonHeader(CommonHeader commonHeader) {
+ this.commonHeader = commonHeader;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+
+ @Override
+ public String toString() {
+ return "OAMContext {" +
+ "rpcName=" + rpcName +
+ ", commonHeader=" + commonHeader +
+ ", status=" + status +
+ '}';
+ }
+}