From 5f6c112e62c87f0b6aa86df22888b70ad68d2207 Mon Sep 17 00:00:00 2001 From: nancylizi Date: Thu, 27 Oct 2016 16:00:31 +0800 Subject: Comet message to WEB after delete package. Change-Id: I03b04aece09b2943c8a80ef56899b688687ab11c Issue-id:TOSCA-153 Signed-off-by: nancylizi --- .../org/openo/commontosca/catalog/CatalogApp.java | 43 +++++---- .../catalog/cometd/CometdException.java | 34 +++++++ .../commontosca/catalog/cometd/CometdService.java | 104 +++++++++++++++++++++ .../commontosca/catalog/cometd/CometdServlet.java | 95 +++++++++++++++++++ .../commontosca/catalog/cometd/CometdUtil.java | 60 ++++++++++++ .../commontosca/catalog/common/CommonConstant.java | 2 + .../catalog/wrapper/PackageWrapper.java | 59 +++++++----- 7 files changed, 354 insertions(+), 43 deletions(-) create mode 100644 catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java create mode 100644 catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java create mode 100644 catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java create mode 100644 catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java (limited to 'catalog-core/catalog-mgr/src/main/java') diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/CatalogApp.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/CatalogApp.java index 6396b545..88e3b16f 100644 --- a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/CatalogApp.java +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/CatalogApp.java @@ -28,6 +28,11 @@ import io.dropwizard.setup.Environment; import io.swagger.jaxrs.config.BeanConfig; import io.swagger.jaxrs.listing.ApiListingResource; +import java.util.EnumSet; + +import javax.servlet.DispatcherType; + +import org.eclipse.jetty.servlets.CrossOriginFilter; import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.openo.commontosca.catalog.common.Config; import org.openo.commontosca.catalog.common.HttpServerAddrConfig; @@ -110,7 +115,7 @@ public class CatalogApp extends Application { environment.jersey().register(MultiPartFeature.class); initSwaggerConfig(environment, configuration); - // initCometd(environment); + initCometd(environment); Config.setConfigration(configuration); initService(); LOGGER.info("Initialize catalogue finished."); @@ -149,22 +154,22 @@ public class CatalogApp extends Application { registerCatalogService.start(); } -// /** -// * initialize cometd server. -// * -// * @param environment environment information -// */ -// private void initCometd(Environment environment) { -// // add filter -// environment.getApplicationContext().addFilter(CrossOriginFilter.class, -// "/api/nsoccataloguenotification/v1/*", -// EnumSet.of(DispatcherType.REQUEST, DispatcherType.ERROR)); -// // add servlet -// environment.getApplicationContext() -// .addServlet("org.cometd.server.CometDServlet", "/api/nsoccataloguenotification/v1/*") -// .setInitOrder(1); -// // add servlet -// environment.getApplicationContext() -// .addServlet("CometdServlet", "/api/nsoccataloguenotification/v1").setInitOrder(2); -// } + /** + * initialize cometd server. + * + * @param environment environment information + */ + private void initCometd(Environment environment) { + // add filter + environment.getApplicationContext().addFilter(CrossOriginFilter.class, + "/openoapi/catalog/v1/catalognotification/*", EnumSet.of(DispatcherType.REQUEST, DispatcherType.ERROR)); + // add servlet + environment.getApplicationContext() + .addServlet("org.cometd.server.CometDServlet", "/openoapi/catalog/v1/catalognotification/*") + .setInitOrder(1); + // add servlet + environment.getApplicationContext() + .addServlet("org.openo.commontosca.catalog.cometd.CometdServlet", "/openoapi/catalog/v1/catalognotification") + .setInitOrder(2); + } } diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java new file mode 100644 index 00000000..7237a37b --- /dev/null +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java @@ -0,0 +1,34 @@ +/** + * Copyright 2016 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openo.commontosca.catalog.cometd; + +public class CometdException extends Exception { + public static int ERROR_CODE_BAYEUX = 0; + public static int ERROR_CODE_PARAM_ERROR = 1; + public static int ERROR_CODE_SESSION_ERROR = 2; + public static int ERROR_CODE_SUBSCRIBE_TIMEOUT = 3; + private int errorCode = -1; + + public CometdException(Throwable e1) { + super(e1); + } + + public CometdException(int code, String message) { + super(message); + this.errorCode = code; + } +} diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java new file mode 100644 index 00000000..a54b9abf --- /dev/null +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java @@ -0,0 +1,104 @@ +/** + * Copyright 2016 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openo.commontosca.catalog.cometd; + +import org.cometd.bayeux.server.BayeuxServer; +import org.cometd.bayeux.server.ConfigurableServerChannel; +import org.cometd.bayeux.server.LocalSession; +import org.cometd.bayeux.server.ServerChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class CometdService { + private static final Logger LOGGER = LoggerFactory.getLogger(CometdService.class); + public static String DATA_UPLOAD_CHANNEL = "/upload/data"; + public static String SNMPTRAP_CHANNEL = "/upload/snmptrap"; + + private BayeuxServer bayeux; + private LocalSession session; + + private static String bayeuxChannel = "/meta/"; + + private static String serviceChannel = "/service/"; + + private static CometdService service = new CometdService(); + + public static CometdService getInstance() { + return service; + } + + public void publish(String channel, Object message) throws CometdException { + if (bayeux == null) { + this.bayeux = CometdUtil.getBayeuxServer(); + checkBayeuxServer(); + this.session = this.bayeux.newLocalSession("openo_catalog_local_session~"); + this.session.handshake(); + } + String jsonMsg; + try { + // jsonMsg = CometdUtil.convertBean2Json(message); + jsonMsg = CometdUtil.toJson(message); + LOGGER.debug("upload json=" + jsonMsg); + } catch (IOException e) { + throw new CometdException(e); + } + + checkAndInit(channel); + ServerChannel serverChannel = this.bayeux.getChannel(channel); + serverChannel.publish(this.session, jsonMsg); + } + + private void checkAndInit(String channel) throws CometdException { + checkBayeuxServer(); + checkSession(); + checkChannel(channel); + bayeux.createChannelIfAbsent(channel, new ConfigurableServerChannel.Initializer() { + public void configureChannel(ConfigurableServerChannel channel) { + channel.setPersistent(true); + channel.setLazy(true); + } + }); + } + + private void checkBayeuxServer() throws CometdException { + if (bayeux == null) { + throw new CometdException(CometdException.ERROR_CODE_BAYEUX, "bayeux is null."); + } + } + + private void checkSession() throws CometdException { + if (session == null || !session.isConnected()) { + throw new CometdException(CometdException.ERROR_CODE_SESSION_ERROR, "session is invalid."); + } + } + + private void checkChannel(String channel) throws CometdException { + if (channel == null || "".equals(channel)) { + throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR, "channel is null."); + } + if (channel.startsWith(bayeuxChannel)) { + throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR, + "channel [" + channel + "] is bayeuxChannel."); + } + if (channel.startsWith(serviceChannel)) { + throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR, + "channel [" + channel + "] is serviceChannel."); + } + } +} diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java new file mode 100644 index 00000000..ace95455 --- /dev/null +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java @@ -0,0 +1,95 @@ +/** + * Copyright 2016 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openo.commontosca.catalog.cometd; + +import org.cometd.annotation.Listener; +import org.cometd.annotation.ServerAnnotationProcessor; +import org.cometd.annotation.Service; +import org.cometd.bayeux.Message; +import org.cometd.bayeux.server.BayeuxServer; +import org.cometd.bayeux.server.ServerChannel; +import org.cometd.bayeux.server.ServerMessage; +import org.cometd.bayeux.server.ServerSession; +import org.cometd.server.BayeuxServerImpl; +import org.cometd.server.authorizer.GrantAuthorizer; +import org.cometd.server.ext.AcknowledgedMessagesExtension; +import org.cometd.server.ext.TimesyncExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import javax.servlet.GenericServlet; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.UnavailableException; +import javax.servlet.http.HttpServletResponse; + + +public class CometdServlet extends GenericServlet { + private static final long serialVersionUID = 8807005039926977330L; + + private static final Logger logger = LoggerFactory.getLogger(CometdServlet.class); + + @Override + public void init() throws ServletException { + super.init(); + + final BayeuxServerImpl bayeux = + (BayeuxServerImpl) getServletContext().getAttribute(BayeuxServer.ATTRIBUTE); + if (bayeux == null) { + throw new UnavailableException("No BayeuxServer!"); + } + // Create extensions + bayeux.addExtension(new TimesyncExtension()); + bayeux.addExtension(new AcknowledgedMessagesExtension()); + + // Allow anybody to handshake + bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(GrantAuthorizer.GRANT_PUBLISH); + + ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux); + processor.process(new CatalogComet()); + + CometdUtil.setBayeuxServer(bayeux); + } + + @Service("catalog") + public static class CatalogComet { + @Listener("/meta/subscribe") + public void catalogSubscribe(ServerSession session, ServerMessage message) { + logger.info("Catalog Subscribe from " + session + " for " + + message.get(Message.SUBSCRIPTION_FIELD)); + } + + @Listener("/meta/unsubscribe") + public void catalogUnsubscribe(ServerSession session, ServerMessage message) { + logger.info("Catalog Unsubscribe from " + session + " for " + + message.get(Message.SUBSCRIPTION_FIELD)); + } + + @Listener("/meta/*") + public void catalogMeta(ServerSession session, ServerMessage message) { + logger.debug(message.toString()); + } + } + + @Override + public void service(ServletRequest servletRequest, ServletResponse servletResponse) + throws ServletException, IOException { + ((HttpServletResponse) servletResponse).sendError(503); + } +} diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java new file mode 100644 index 00000000..efb2d35d --- /dev/null +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java @@ -0,0 +1,60 @@ +/** + * Copyright 2016 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openo.commontosca.catalog.cometd; + +import java.io.IOException; +import java.util.Map; + +import org.cometd.server.BayeuxServerImpl; + +import com.google.gson.Gson; + +public class CometdUtil { + private static BayeuxServerImpl bayeuxServer = null; + + public static BayeuxServerImpl getBayeuxServer() { + return bayeuxServer; + } + + public static void setBayeuxServer(BayeuxServerImpl bayeux) { + bayeuxServer = bayeux; + } + +// public static String convertBean2Json(Object object) throws IOException { +// ObjectMapper mapper = new ObjectMapper(); +// return mapper.writeValueAsString(object); +// } +// +// public static Map convertJson2Map(String jsonStr) throws IOException { +// ObjectMapper objectMapper = new ObjectMapper(); +// return objectMapper.readValue(jsonStr, Map.class); +// } + public static T fromJson(String jsonString, Class templateClass) throws IOException { + Gson gson = new Gson(); + return gson.fromJson(jsonString, templateClass); + } + + /** + * gson to json. + * @param template class name + * @return String + */ + public static String toJson(T template) throws IOException { + Gson gson = new Gson(); + return gson.toJson(template); + } +} diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/common/CommonConstant.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/common/CommonConstant.java index a2d57527..8250c94f 100644 --- a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/common/CommonConstant.java +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/common/CommonConstant.java @@ -45,4 +45,6 @@ public class CommonConstant { public static final String HTTP_HEADER_CONTENT_RANGE = "Content-Range"; public static final String CATALOG_CSAR_DIR_NAME = "/csar"; + + public static final String COMETD_CHANNEL_PACKAGE_DELETE = "/package/delete"; } diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/wrapper/PackageWrapper.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/wrapper/PackageWrapper.java index 8a882533..247446ff 100644 --- a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/wrapper/PackageWrapper.java +++ b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/wrapper/PackageWrapper.java @@ -16,6 +16,9 @@ package org.openo.commontosca.catalog.wrapper; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.openo.commontosca.catalog.cometd.CometdException; +import org.openo.commontosca.catalog.cometd.CometdService; +import org.openo.commontosca.catalog.cometd.CometdUtil; import org.openo.commontosca.catalog.common.CommonConstant; import org.openo.commontosca.catalog.common.HttpServerPathConfig; import org.openo.commontosca.catalog.common.RestUtil; @@ -42,6 +45,8 @@ import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; @@ -197,6 +202,18 @@ public class PackageWrapper { } } + private void publishDeletionPendingStatusCometdMessage(String csarid) { + try { + Map cometdMessage = new HashMap(); + cometdMessage.put("csarid", csarid); + cometdMessage.put("status", "deletionPending"); + CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE, + cometdMessage); + } catch (CometdException e1) { + LOG.error("publish delfinish cometdmsg fail.", e1); + } + } + class DelCsarThread implements Runnable { private String csarid; private boolean isInstanceTemplate = false; @@ -216,12 +233,12 @@ public class PackageWrapper { LOG.error("del instance csar fail."+ e1.getMessage()); updatePackageStatus(csarid, null, null, null, CommonConstant.PACKAGE_STATUS_DELETE_FAIL, null); - // publishDelFinishCometdMessage(csarid, "false"); + publishDelFinishCometdMessage(csarid, "false"); } } private void delCsarData(String csarId) { - updatePackageStatus(csarid, null, null, null, CommonConstant.PACKAGE_STATUS_DELETING, null); + updatePackageStatus(csarId, null, null, null, CommonConstant.PACKAGE_STATUS_DELETING, null); String packagePath = PackageWrapperUtil.getPackagePath(csarId); if (packagePath == null) { LOG.error("package path is null! "); @@ -236,36 +253,30 @@ public class PackageWrapper { } catch (CatalogResourceException e) { LOG.error("delete template data from db error! csarId = " + csarId, e); } -// PackageData packageData = new PackageData(); -// packageData.setCsarId(csarId); -// try { -// TemplateManager.getInstance().deleteServiceTemplateByCsarPackageInfo(packageData); -// } catch (CatalogResourceException e2) { -// LOG.error("delete template data from db error! csarId = " + csarId); -// } //delete package data from database try { PackageManager.getInstance().deletePackage(csarId); } catch (CatalogResourceException e1) { LOG.error("delete package by csarId from db error ! " + e1.getMessage(), e1); } + publishDelFinishCometdMessage(csarId, "true"); } - // private void publishDelFinishCometdMessage(String csarid, String csarDelStatus) { - // if (isInstanceTemplate) { - // LOG.info("delete instance Template finish. csarid:{}", csarid); - // return; - // } - // try { - // Map cometdMessage = new HashMap(); - // cometdMessage.put("csarid", csarid); - // cometdMessage.put("status", csarDelStatus); - // CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE, - // cometdMessage); - // } catch (CometdException e) { - // LOG.error("publish delfinish cometdmsg fail.", e); - // } - // } + private void publishDelFinishCometdMessage(String csarId, String csarDelStatus) { + if (isInstanceTemplate) { + LOG.info("delete instance Template finish. csarid:{}", csarId); + return; + } + try { + Map cometdMessage = new HashMap(); + cometdMessage.put("csarid", csarId); + cometdMessage.put("status", csarDelStatus); + CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE, + cometdMessage); + } catch (CometdException e1) { + LOG.error("publish delfinish cometdmsg fail." + e1.getMessage(), e1); + } + } } /** -- cgit 1.2.3-korg