summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorying.yunlong <ying.yunlong@zte.com.cn>2017-07-12 10:05:10 +0800
committerying.yunlong <ying.yunlong@zte.com.cn>2017-07-12 10:05:10 +0800
commit9998c9d1ce976a2c040e694d25e53b4c2b527de8 (patch)
tree039049da0d740104fa7b851bdf4694e6b1d840bb
parent4deecf0e472d21294ffd6c4ffcbb44c4ec3779e5 (diff)
Create seed code of ems driver
Upload the seed code to gerrit for the first time. Change-Id: I2e3bd269d9a0daca63faef838356b5a113bdae9b Issue-ID: VFC-59 Signed-off-by: ying.yunlong <ying.yunlong@zte.com.cn>
-rw-r--r--ems/sems/boco/ems-driver/cfg/EMSInfo.xml52
-rw-r--r--ems/sems/boco/ems-driver/cfg/emsdriver.yml20
-rw-r--r--ems/sems/boco/ems-driver/cfg/ftpconfig.properties28
-rw-r--r--ems/sems/boco/ems-driver/cfg/log4j.properties40
-rw-r--r--ems/sems/boco/ems-driver/cfg/spring.xml50
-rw-r--r--ems/sems/boco/ems-driver/pom.xml98
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/EMSDriverApp.java75
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/CollectMsgReceiverThread.java91
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThread.java734
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThreadService.java108
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmManager.java80
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java297
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/HeartBeat.java60
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MessageUtil.java164
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/Msg.java108
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MsgType.java63
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/constant/Constant.java46
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/AFtpRemoteFile.java81
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/ExtendsDefaultFTPFileEntryParserFactory.java224
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPInterface.java45
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPSrv.java199
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/RemoteFile.java20
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/SFTPSrv.java58
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectMsg.java69
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectVo.java281
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/EMSInfo.java59
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/DriverThread.java91
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/StringUtil.java62
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/UnZip.java94
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/XmlUtil.java42
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/Zip.java108
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationImp.java57
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationInterface.java31
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationManager.java184
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannel.java60
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannelFactory.java61
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientFactory.java50
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientUtil.java154
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/NorthMessageMgr.java125
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/CommandResource.java39
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverApplication.java93
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverConfiguration.java51
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbConfiguration.java29
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbRestServiceProxy.java61
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/MsbRegisterVo.java105
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/ServiceNodeVo.java62
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectManager.java107
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectOderJob.java57
-rw-r--r--ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/QuartzManager.java162
49 files changed, 5035 insertions, 0 deletions
diff --git a/ems/sems/boco/ems-driver/cfg/EMSInfo.xml b/ems/sems/boco/ems-driver/cfg/EMSInfo.xml
new file mode 100644
index 0000000..d1add3e
--- /dev/null
+++ b/ems/sems/boco/ems-driver/cfg/EMSInfo.xml
@@ -0,0 +1,52 @@
+<?xml version='1.0' encoding='GBK'?>
+<!--
+
+ Copyright 2017 BOCO Corporation.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+-->
+<collectSources>
+ <ems name="1234">
+ <collect type ="cm" crontab="0 0/1 * * * ?">
+ <ip>192.168.6.48</ip>
+ <port>21</port>
+ <user>gcp</user>
+ <password>gcp</password>
+ <remotepath>/opt/Gcp/data/</remotepath>
+ <match>*</match>
+ <passive>true</passive>
+ <ftptype>ftp</ftptype>
+ <granularity>15</granularity>
+ </collect>
+ <collect type ="pm" crontab="">
+ <ip>192.168.1.16</ip>
+ <port>21</port>
+ <user>gcp</user>
+ <password>gcp</password>
+ <remotepath>/var/vsftp/gcp/ftproot/GD/WX/HW/JS_OMC2/</remotepath>
+ <match>*</match>
+ <passive>true</passive>
+ <ftptype>ftp</ftptype>
+ <granularity>15</granularity>
+ </collect>
+ <collect type ="alarm" iscollect = "false">
+ <ip>127.0.0.1</ip>
+ <port>9997</port>
+ <user>yiyang</user>
+ <password>123456</password>
+ <readtimeout>6</readtimeout>;
+ </collect>
+ </ems>
+</collectSources> \ No newline at end of file
diff --git a/ems/sems/boco/ems-driver/cfg/emsdriver.yml b/ems/sems/boco/ems-driver/cfg/emsdriver.yml
new file mode 100644
index 0000000..2ad5dd4
--- /dev/null
+++ b/ems/sems/boco/ems-driver/cfg/emsdriver.yml
@@ -0,0 +1,20 @@
+
+# Copyright 2017 BOCO Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+msbAddress: 10.74.205.123:80
+defaultName: EmsDriver-Stranger
+
+
diff --git a/ems/sems/boco/ems-driver/cfg/ftpconfig.properties b/ems/sems/boco/ems-driver/cfg/ftpconfig.properties
new file mode 100644
index 0000000..96e2f43
--- /dev/null
+++ b/ems/sems/boco/ems-driver/cfg/ftpconfig.properties
@@ -0,0 +1,28 @@
+
+# Copyright 2017 BOCO Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ftp_ip = 192.168.6.48
+ftp_port = 21
+ftp_user = gcp
+ftp_password = gcp
+ftp_remote_path = /opt/Gcp/
+ftp_passive = true
+ftp_type = ftp
+
+#
+msbAddress: 10.74.205.123:80
+dataNotifyUrl:/dataNotify
+alarmUrl:/alarm \ No newline at end of file
diff --git a/ems/sems/boco/ems-driver/cfg/log4j.properties b/ems/sems/boco/ems-driver/cfg/log4j.properties
new file mode 100644
index 0000000..455d401
--- /dev/null
+++ b/ems/sems/boco/ems-driver/cfg/log4j.properties
@@ -0,0 +1,40 @@
+# Copyright 2017 BOCO Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=debug,R,E,stdout
+
+#### \u7b2c\u4e00\u4e2a appender\u5c06log\u5199\u5230\u5c4f\u5e55
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%p] [%d] [%c] - [%m]%n
+log4j.appender.stdout.Threshold=INFO
+
+#### \u7b2c\u4e8c\u4e2a appender \u5c06\u5168\u90e8log\u5199\u5230\u6587\u4ef6
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=../log/monitor.log
+log4j.appender.R.MaxFileSize=20MB
+log4j.appender.R.MaxBackupIndex=50
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=[%p] [%d] [%t] [%c] - [%m]%n
+log4j.appender.R.Threshold=INFO
+
+#### \u7b2c\u4e09\u4e2a appender \u5c06\u9519\u8beflog\u5199\u5230\u6587\u4ef6
+log4j.appender.E=org.apache.log4j.RollingFileAppender
+log4j.appender.E.File=../log/error.log
+log4j.appender.E.MaxFileSize=20MB
+log4j.appender.E.MaxBackupIndex=10
+log4j.appender.E.layout=org.apache.log4j.PatternLayout
+log4j.appender.E.layout.ConversionPattern=[%p] [%d] [%t] [%c] - [%m]%n
+log4j.appender.E.Threshold=ERROR
diff --git a/ems/sems/boco/ems-driver/cfg/spring.xml b/ems/sems/boco/ems-driver/cfg/spring.xml
new file mode 100644
index 0000000..1531c8d
--- /dev/null
+++ b/ems/sems/boco/ems-driver/cfg/spring.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="GBK"?>
+<!--
+
+ Copyright 2017 BOCO Corporation.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"
+ default-lazy-init="true">
+
+ <bean id="configurationManager" class="org.openo.nfvo.emsdriver.configmgr.ConfigurationManager">
+ <property name="run" value="true" />
+
+ </bean>
+ <bean id="collectMsgReceiverThread" class="org.openo.nfvo.emsdriver.collector.CollectMsgReceiverThread">
+ <property name="run" value="true" />
+ <property name="thread_max_num" value="100" />
+ </bean>
+
+ <bean id="configurationImp" class="org.openo.nfvo.emsdriver.configmgr.ConfigurationImp">
+ </bean>
+ <bean id="alarmManager" class="org.openo.nfvo.emsdriver.collector.alarm.AlarmManager">
+ <property name="run" value="true" />
+ <property name="configurationInterface" ref="configurationImp" />
+ </bean>
+ <bean id="collectManager" class="org.openo.nfvo.emsdriver.taskscheduler.CollectManager">
+ <property name="run" value="true" />
+ <property name="configurationInterface" ref="configurationImp" />
+ </bean>
+ <bean id="northMessageMgr" class="org.openo.nfvo.emsdriver.northbound.client.NorthMessageMgr">
+ <property name="run" value="true" />
+ <property name="configurationInterface" ref="configurationImp" />
+ </bean>
+
+
+</beans>
diff --git a/ems/sems/boco/ems-driver/pom.xml b/ems/sems/boco/ems-driver/pom.xml
new file mode 100644
index 0000000..ba35471
--- /dev/null
+++ b/ems/sems/boco/ems-driver/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2017, BOCO.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.openo.nfvo</groupId>
+ <artifactId>nfvo-root</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../../../..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.openo.nfvo</groupId>
+ <artifactId>ems-driver</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>nfvo/drivers/ems/sems/boco/ems-driver</name>
+ <description>nfvo ems-driver</description>
+ <url>http://maven.apache.org</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.24</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ <version>1.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>jdom</groupId>
+ <artifactId>jdom</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>4.3.6.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>1.7.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-core</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.2</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/EMSDriverApp.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/EMSDriverApp.java
new file mode 100644
index 0000000..c9d551c
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/EMSDriverApp.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.northbound.service.EmsDriverApplication;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+
+
+public class EMSDriverApp {
+
+ private static Log log = LogFactory.getLog(DriverThread.class);
+ public static ApplicationContext context = null;
+ static {
+ try {
+ /** spring bean applicationContext **/
+ context = new FileSystemXmlApplicationContext("file:" + Constant.SYS_CFG+ "spring.xml");
+
+ } catch (BeansException e) {
+ log.error("spring.xml is fail ", e);
+ System.exit(1);
+ } catch (Exception e) {
+ log.error("spring.xml is fail", e);
+ System.exit(1);
+ }
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ String[] allThreadName = context.getBeanNamesForType(DriverThread.class);
+
+ log.info("worker num :" + allThreadName.length);
+
+ for (String threadName : allThreadName) {
+ DriverThread thread = (DriverThread) context.getBean(threadName);
+ if (thread == null) {
+ log.error(threadName + "Thread start error,system exit");
+ System.exit(1);
+ }
+ thread.setName(threadName);
+ thread.start();
+ }
+
+ try {
+ new EmsDriverApplication().run(args);
+ } catch (Exception e) {
+ log.error("EmsDriverApplication.run is fail", e);
+ System.exit(1);
+ }
+ log.info("the workerThreads start sucess" );
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/CollectMsgReceiverThread.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/CollectMsgReceiverThread.java
new file mode 100644
index 0000000..7bbd2e0
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/CollectMsgReceiverThread.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectMsg;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannel;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannelFactory;
+
+public class CollectMsgReceiverThread extends DriverThread{
+
+ private long timeStamp = System.currentTimeMillis();
+
+ private MessageChannel collectChannel;
+
+ private TaskThreadService taskService;
+
+ private int thread_max_num = 100;
+
+
+
+ @Override
+ public void dispose() {
+ collectChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
+
+ taskService = TaskThreadService.getInstance(thread_max_num);
+ taskService.start();
+
+ while (isRun()) {
+
+ try {
+ if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
+ timeStamp = System.currentTimeMillis();
+
+ log.debug("COLLECT_CHANNEL Msg size :"+collectChannel.size());
+ }
+
+ Object obj = collectChannel.poll();
+ if(obj == null){
+ continue;
+ }
+ if(obj != null && obj instanceof CollectMsg){
+ CollectMsg collectMsg = (CollectMsg)obj;
+ taskService.add(collectMsg);
+ log.debug("receive a CollectMsg id = "+collectMsg.getId());
+ }else{
+ log.error("receive Objcet not CollectMsg "+obj);
+ }
+
+ } catch (Exception e) {
+ log.error("dispatch alarm exception",e);
+
+ }
+ }
+
+ }
+
+
+
+ /**
+ * @return the thread_max_num
+ */
+ public int getThread_max_num() {
+ return thread_max_num;
+ }
+
+
+
+ /**
+ * @param thread_max_num the thread_max_num to set
+ */
+ public void setThread_max_num(int thread_max_num) {
+ this.thread_max_num = thread_max_num;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThread.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThread.java
new file mode 100644
index 0000000..c88469b
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThread.java
@@ -0,0 +1,734 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.ftp.AFtpRemoteFile;
+import org.openo.nfvo.emsdriver.commons.ftp.FTPInterface;
+import org.openo.nfvo.emsdriver.commons.ftp.FTPSrv;
+import org.openo.nfvo.emsdriver.commons.ftp.SFTPSrv;
+import org.openo.nfvo.emsdriver.commons.model.CollectMsg;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.utils.StringUtil;
+import org.openo.nfvo.emsdriver.commons.utils.UnZip;
+import org.openo.nfvo.emsdriver.commons.utils.Zip;
+import org.openo.nfvo.emsdriver.configmgr.ConfigurationImp;
+import org.openo.nfvo.emsdriver.configmgr.ConfigurationInterface;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannel;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannelFactory;
+
+
+public class TaskThread implements Runnable{
+
+ public Log log = LogFactory.getLog(TaskThread.class);
+
+ private MessageChannel collectResultChannel;
+
+ private CollectMsg data;
+
+ private ConfigurationInterface configurationInterface = new ConfigurationImp();
+
+ private String localPath = Constant.SYS_DATA_TEMP;
+ private String resultPath = Constant.SYS_DATA_RESULT;
+
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+
+ private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public TaskThread(CollectMsg data) {
+ this.data = data;
+ }
+
+ @Override
+ public void run(){
+
+ collectResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
+
+ try {
+ collectMsgHandle(data);
+ } catch (Exception e) {
+ log.error("",e);
+ }
+ }
+
+ private void collectMsgHandle(CollectMsg collectMsg) {
+ String emsName = collectMsg.getEmsName();
+ String type = collectMsg.getType();
+ CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
+
+ //ftp download
+ List<String> downloadfiles = this.ftpDownload(collectVo);
+ //paser ftp update message send
+ for(String fileName :downloadfiles){
+ this.parseFtpAndSendMessage(fileName,collectVo);
+ }
+ }
+
+ private void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
+ //
+ List<File> filelist = decompressed(fileName);
+
+ for (File tempfile : filelist) {
+
+ String unfileName = tempfile.getName();
+
+ Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
+ Matcher ma = pa.matcher(unfileName);
+ if (!ma.find())
+ continue;
+ String nename = ma.group(1);
+ boolean parseResult = false;
+ if("CM".equalsIgnoreCase(collectVo.getType())){
+ parseResult = processCMXml(tempfile, nename,"CM");
+ }else{
+ parseResult = processPMCsv(tempfile, nename,"PM");
+ }
+
+ if (parseResult){
+ log.info("parser "+tempfile+" sucess");
+ }else {
+ log.info("parser "+tempfile+" fail");
+ }
+
+ }
+ }
+
+ private boolean processPMCsv(File tempfile, String nename,String type) {
+
+ String csvpath = localPath+nename+"/"+type+"/";
+ File csvpathfile = new File(csvpath);
+ if(!csvpathfile.exists()){
+ csvpathfile.mkdirs();
+ }
+ String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
+ String csvpathAndFileName = csvpath+csvFileName;
+ BufferedOutputStream bos = null;
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(csvpathAndFileName,false);
+ bos = new BufferedOutputStream(fos, 10240);
+ } catch (FileNotFoundException e1) {
+ log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
+ }
+
+ FileInputStream brs = null;
+ InputStreamReader isr = null;
+ BufferedReader br = null;
+
+ List<String> columnNames = new ArrayList<String>();
+ List<String> commonValues = new ArrayList<String>();
+ try {
+
+ brs = new FileInputStream(tempfile);
+ isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
+ br = new BufferedReader(isr);
+ //common field
+ String commonField = br.readLine();
+ String[] fields = commonField.split("|",-1);
+ for(String com : fields){
+ String[] comNameAndValue = com.split("=",2);
+ columnNames.add(comNameAndValue[0].trim());
+ commonValues.add(comNameAndValue[1]);
+ }
+ //column names
+ String columnName = br.readLine();
+ String[] names = columnName.split("|",-1);
+ for(String name : names){
+ columnNames.add(name);
+ }
+
+ String xmlPathAndFileName = this.setColumnNames(nename, columnNames,type);
+
+ String valueLine = "";
+ List<String> valuelist = new ArrayList<String>();
+ int countNum = 0 ;
+ while (br.readLine() != null) {
+
+ if (valueLine.trim().equals("")) {
+ continue;
+ }
+ countNum ++;
+ String [] values = valueLine.split("|",-1);
+
+ valuelist.addAll(commonValues);
+ for(String value : values){
+ valuelist.add(value);
+ }
+ this.appendLine(valuelist, bos);
+
+ valuelist.clear();
+ }
+
+ if(bos != null){
+ bos.close();
+ bos = null;
+ }
+ if(fos != null){
+ fos.close();
+ fos = null;
+ }
+
+ String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
+ //ftp store
+ Properties ftpPro = configurationInterface.getProperties();
+ String ip = ftpPro.getProperty("ftp_ip");
+ String port = ftpPro.getProperty("ftp_port");
+ String ftp_user = ftpPro.getProperty("ftp_user");
+ String ftp_password = ftpPro.getProperty("ftp_password");
+
+ String ftp_passive = ftpPro.getProperty("ftp_passive");
+ String ftp_type = ftpPro.getProperty("ftp_type");
+ String remoteFile = ftpPro.getProperty("ftp_remote_path");
+ this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
+ //create Message
+ String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
+
+ //set message
+ this.setMessage(message);
+ } catch (IOException e) {
+ log.error("processPMCsv is fail ",e);
+ return false;
+ }finally{
+ try{
+ if (br != null)
+ br.close();
+ if (isr != null)
+ isr.close();
+ if (brs != null)
+ brs.close();
+ if(bos != null){
+ bos.close();
+ }
+
+ if(fos != null){
+ fos.close();
+ }
+ } catch (Exception e){
+ log.error(e);
+ }
+ }
+ return true;
+
+ }
+
+ private boolean processCMXml(File tempfile, String nename, String type) {
+
+ String csvpath = localPath+nename+"/"+type+"/";
+ File csvpathfile = new File(csvpath);
+ if(!csvpathfile.exists()){
+ csvpathfile.mkdirs();
+ }
+ String csvFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
+ String csvpathAndFileName = csvpath+csvFileName+".csv";
+ BufferedOutputStream bos = null;
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(csvpathAndFileName,false);
+ bos = new BufferedOutputStream(fos, 10240);
+ } catch (FileNotFoundException e1) {
+ log.error("FileNotFoundException "+StringUtil.getStackTrace(e1));
+ }
+
+ boolean FieldNameFlag = false;
+ boolean FieldValueFlag = false;
+ //line num
+ int countNum = 0;
+ String xmlPathAndFileName = null;
+ String localName = null;
+ String endLocalName = null;
+ String rmUID = null;
+ int index = -1;
+ ArrayList<String> names = new ArrayList<String>();// colname
+ LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
+
+
+ FileInputStream fis = null;
+ InputStreamReader isr = null;
+ XMLStreamReader reader = null;
+ try{
+ fis = new FileInputStream(tempfile);
+ isr = new InputStreamReader(fis, Constant.ENCODING_UTF8);
+ XMLInputFactory fac = XMLInputFactory.newInstance();
+ reader = fac.createXMLStreamReader(isr);
+ int event = -1;
+ boolean setcolum = true;
+ while (reader.hasNext()){
+ try{
+ event = reader.next();
+ switch (event){
+ case XMLStreamConstants.START_ELEMENT:
+ localName = reader.getLocalName();
+ if ("FieldName".equalsIgnoreCase(localName)){
+ FieldNameFlag = true;
+ }
+ if (FieldNameFlag){
+ if ("N".equalsIgnoreCase(localName)){
+ String colName = reader.getElementText().trim();
+ names.add(colName);
+ }
+ }
+ if ("FieldValue".equalsIgnoreCase(localName)){
+ FieldValueFlag = true;
+
+ }
+ if (FieldValueFlag){
+ if(setcolum){
+ xmlPathAndFileName = this.setColumnNames(nename, names,type);
+ setcolum = false;
+ }
+
+ if ("Object".equalsIgnoreCase(localName)){
+ int ac = reader.getAttributeCount();
+ for (int i = 0; i < ac; i++){
+ if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))){
+ rmUID = reader.getAttributeValue(i).trim();
+ }
+ }
+ nameAndValue.put("rmUID", rmUID);
+ }
+ if ("V".equalsIgnoreCase(localName)) {
+ index = Integer.parseInt(reader
+ .getAttributeValue(0)) - 1;
+ String currentName = names.get(index);
+ String v = reader.getElementText().trim();
+ nameAndValue.put(currentName, v);
+ }
+ }
+ break;
+ case XMLStreamConstants.CHARACTERS:
+ break;
+ case XMLStreamConstants.END_ELEMENT:
+ endLocalName = reader.getLocalName();
+
+ if ("FieldName".equalsIgnoreCase(endLocalName)){
+ FieldNameFlag = false;
+ }
+ if ("FieldValue".equalsIgnoreCase(endLocalName)){
+ FieldValueFlag = false;
+ }
+ if ("Object".equalsIgnoreCase(endLocalName)){
+ countNum ++;
+ this.appendLine(nameAndValue,bos);
+ nameAndValue.clear();
+ }
+ break;
+ }
+ } catch (Exception e)
+ {
+ log.error(""+StringUtil.getStackTrace(e));
+ event = reader.next();
+ }
+ }
+
+
+ if(bos != null){
+ bos.close();
+ bos = null;
+ }
+ if(fos != null){
+ fos.close();
+ fos = null;
+ }
+
+ String[] fileKeys = this.createZipFile(csvpathAndFileName,xmlPathAndFileName,nename);
+ //ftp store
+ Properties ftpPro = configurationInterface.getProperties();
+ String ip = ftpPro.getProperty("ftp_ip");
+ String port = ftpPro.getProperty("ftp_port");
+ String ftp_user = ftpPro.getProperty("ftp_user");
+ String ftp_password = ftpPro.getProperty("ftp_password");
+
+ String ftp_passive = ftpPro.getProperty("ftp_passive");
+ String ftp_type = ftpPro.getProperty("ftp_type");
+ String remoteFile = ftpPro.getProperty("ftp_remote_path");
+ this.ftpStore(fileKeys,ip,port,ftp_user,ftp_password,ftp_passive,ftp_type,remoteFile);
+ //create Message
+ String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum,nename);
+
+ //set message
+ this.setMessage(message);
+ } catch (Exception e){
+ log.error(""+StringUtil.getStackTrace(e));
+ return false;
+ } finally{
+ try{
+ if (reader != null){
+ reader.close();
+ }
+ if (isr != null){
+ isr.close();
+ }
+ if (fis != null){
+ fis.close();
+ }
+ if(bos != null){
+ bos.close();
+ }
+
+ if(fos != null){
+ fos.close();
+ }
+ } catch (Exception e){
+ log.error(e);
+ }
+ }
+ return true;
+ }
+
+ private void setMessage(String message) {
+
+ try {
+ collectResultChannel.put(message);
+ } catch (Exception e) {
+ log.error("collectResultChannel.put(message) is error "+StringUtil.getStackTrace(e));
+ }
+ }
+
+ private String createMessage(String zipName,String user,String pwd,String ip, String port,int countNum, String nename) {
+
+ StringBuffer strBuffer = new StringBuffer();
+ strBuffer
+ .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+ + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
+ + "<Header SessionID=\"");
+ strBuffer.append("");
+ strBuffer.append("\" LicenceID=\"");
+ strBuffer.append("");
+ strBuffer.append("\" SystemID=\"");
+ strBuffer.append("");
+ strBuffer.append("\" Time=\"");
+ strBuffer.append( dateFormat2.format(new Date()));
+ strBuffer.append("\" PolicyID=\"");
+ strBuffer.append("");
+ strBuffer.append("\"/><Body>");
+ strBuffer.append("<DataCatalog>");
+ strBuffer.append("");
+ strBuffer.append("</DataCatalog><GroupID>");
+ strBuffer.append(nename);
+ strBuffer.append("</GroupID><DataSourceName>");
+ strBuffer.append("");
+ strBuffer.append("</DataSourceName><InstanceID>");
+ strBuffer.append("");
+ strBuffer.append("</InstanceID><FileFormat>");
+ strBuffer.append("csv");
+ strBuffer.append("</FileFormat><CharSet>");
+ strBuffer.append("gbk");
+ strBuffer.append("</CharSet><FieldSeparator>");
+ strBuffer.append("|");
+ strBuffer.append("</FieldSeparator><IsCompressed>");
+ strBuffer.append("true");
+ strBuffer.append("</IsCompressed><StartTime>");
+ strBuffer.append(dateFormat2.format(new Date()));
+ strBuffer.append("</StartTime><EndTime>");
+ strBuffer.append("");
+ strBuffer.append("</EndTime><FileList>");
+ strBuffer.append(zipName);
+ strBuffer.append("</FileList><ConnectionString>");
+ strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port);
+ strBuffer.append("</ConnectionString>");
+ strBuffer.append("<DataCount>");
+ strBuffer.append(countNum);
+ strBuffer.append("</DataCount>");
+
+ strBuffer.append("<FileSize>").append("").append("</FileSize>");
+ strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
+
+
+ strBuffer.append("</Body></FILE_DATA_READY_UL>");
+ return strBuffer.toString();
+
+ }
+
+ private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
+ String ftp_passive, String ftp_type, String remoteFile) {
+ String zipFilePath = fileKeys[0];
+
+
+ FTPInterface ftpClient;
+ if("ftp".equalsIgnoreCase(ftp_type)){
+ ftpClient = new FTPSrv();
+ }else{
+ ftpClient = new SFTPSrv();
+ }
+
+ //login
+ try {
+ ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5*60*1000);
+ } catch (Exception e) {
+ log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+ftp_user+"]pwd=["+ftp_password+"]"+StringUtil.getStackTrace(e));
+ return;
+ }
+ ftpClient.store(zipFilePath, remoteFile);
+ log.debug("store ["+zipFilePath+"]to["+remoteFile+"]");
+
+ FileUtils.deleteQuietly(new File(zipFilePath));
+
+
+ }
+
+ private String[] createZipFile(String csvpathAndFileName,String xmlPathAndFileName,String nename) {
+
+ String zipPath = resultPath+nename +dateFormat.format(new Date())+"_"+System.nanoTime();
+
+ File destDir = new File(zipPath);
+ destDir.mkdirs();
+
+ try {
+ FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
+ FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
+ } catch (IOException e) {
+
+ }
+
+ String destFilePath = zipPath + ".zip";
+ try {
+ Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath);
+ zip.setCompressLevel(9);
+ zip.compress();
+
+ FileUtils.deleteDirectory(destDir);
+ } catch (IOException e) {
+ log.error("zip.compress() is fail "+StringUtil.getStackTrace(e));
+ }
+ return new String[] { destFilePath, zipPath + ".zip"};
+ }
+
+
+ private String setColumnNames(String nename, List<String> names,String type) {
+ //write xml
+ String xmlpath = localPath+nename +"/"+type+"/";
+ File xmlpathfile = new File(xmlpath);
+ if(!xmlpathfile.exists()){
+ xmlpathfile.mkdirs();
+ }
+ String xmlFileName = nename +dateFormat.format(new Date())+ System.nanoTime();
+ String fieldLine = "";
+ for (int i = 0; i < names.size(); i++) {
+ String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
+ + "</FieldNo>\r\n" + "\t\t<FieldName>"
+ + names.get(i) + "</FieldName>\r\n"
+ + "\t\t<FieldType>" + names.get(i)
+ + "</FieldType>\r\n"
+ + "\t\t<FieldNameOther>" + names.get(i)
+ + "</FieldNameOther>\r\n" +
+ "\t</Field>\r\n";
+ fieldLine = fieldLine + field;
+ }
+
+ String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
+ + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
+ + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
+ String xmlPathAndFileName = xmlpath+xmlFileName+".xml";
+ try {
+ this.writeDetail(xmlPathAndFileName,str);
+ } catch (Exception e) {
+ log.error("writeDetail is fail ,xmlFileName="+xmlFileName +StringUtil.getStackTrace(e));
+ }
+
+ return xmlPathAndFileName;
+ }
+
+ private void writeDetail(String detailFileName,String str) throws Exception {
+ OutputStreamWriter writer = null;
+ OutputStream readOut = null;
+ try {
+ readOut = new FileOutputStream(new File(detailFileName), false);
+ writer = new OutputStreamWriter(readOut);
+ writer.write(str);
+ writer.flush();
+ } finally {
+
+ if(null != writer){
+ writer.close();
+ }
+ if(readOut != null){
+ readOut.close();
+ }
+
+ }
+
+ }
+
+
+ private void appendLine(LinkedHashMap<String, String> nameAndValue,BufferedOutputStream bos) {
+ StringBuilder lineDatas = new StringBuilder();
+
+ for (String key : nameAndValue.keySet()) {
+ lineDatas.append(nameAndValue.get(key)).append("|");
+ }
+ try {
+ bos.write(lineDatas.toString().getBytes());
+ bos.write("\n".getBytes());
+ } catch (IOException e) {
+ log.error("appendLine error "+StringUtil.getStackTrace(e));
+ }
+ }
+
+ private void appendLine(List<String> values,BufferedOutputStream bos) {
+ StringBuilder lineDatas = new StringBuilder();
+
+ for (String value : values) {
+ lineDatas.append(value).append("|");
+ }
+ try {
+ bos.write(lineDatas.toString().getBytes());
+ bos.write("\n".getBytes());
+ } catch (IOException e) {
+ log.error("appendLine error "+StringUtil.getStackTrace(e));
+ }
+ }
+
+ public List<File> decompressed(String fileName){
+ List<File> filelist = new ArrayList<File>();
+
+ if (fileName.indexOf(".gz") > 1)
+ {
+// decompressFile = deGz(file);
+ } else if (fileName.indexOf(".zip") > 1)
+ {
+ try {
+ File[] files = deZip(new File(fileName));
+ for(File temp :files){
+ filelist.add(temp);
+ }
+ } catch (Exception e) {
+ log.error("decompressed is fail "+StringUtil.getStackTrace(e));
+ }
+ }
+ else {
+ filelist.add(new File(fileName));
+ }
+
+ return filelist;
+ }
+
+ public File[] deZip(File file) throws Exception{
+
+ String regx = "(.*).zip";
+ Pattern p = Pattern.compile(regx);
+ Matcher m = p.matcher(file.getName());
+ if (m.find())
+ {
+ String orgFile = localPath + m.group(1) + "/";
+ UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile);
+ unzip.deCompress();
+ file = new File(orgFile);
+ }
+ File[] files = file.listFiles();
+
+ return files;
+
+ }
+
+ private List<String> ftpDownload(CollectVo collectVo) {
+
+ List<String> fileList = new ArrayList<String>();
+ //IP
+ String ip = collectVo.getIP();
+ //port
+ String port = collectVo.getPort();
+ //user
+ String user = collectVo.getUser();
+ //password
+ String password = collectVo.getPassword();
+ //isPassiveMode
+ String passivemode = collectVo.getPassive();
+
+ String ftpType = collectVo.getFtptype();
+ FTPInterface ftpClient = null;
+ if("ftp".equalsIgnoreCase(ftpType)){
+ ftpClient = new FTPSrv();
+ }else{
+ ftpClient = new SFTPSrv();
+ }
+
+ //login
+ try {
+ ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5*60*1000);
+ } catch (Exception e) {
+ log.error("login fail,ip=["+ip+"] port=["+port+"] user=["+user+"]password=["+password+"]"+StringUtil.getStackTrace(e));
+ return fileList;
+ }
+
+ //download
+ String dir = collectVo.getRemotepath();
+ boolean cdsucess = ftpClient.chdir(dir);
+ if(cdsucess){
+ AFtpRemoteFile[] remoteFiles = (AFtpRemoteFile[]) ftpClient.list();
+
+ for(AFtpRemoteFile ftpRemoteFile: remoteFiles){
+ if(!new File(localPath).exists()){
+ try {
+ new File(localPath).mkdir();
+ } catch (Exception e) {
+ log.error("create localPath is fail localPath="+localPath+" "+StringUtil.getStackTrace(e));
+ }
+ }
+
+ if(!new File(localPath).exists()){
+ new File(localPath).mkdirs();
+ }
+
+ String localFileName = localPath + ftpRemoteFile.getFileName();
+ File loaclFile = new File(localFileName);
+ if (loaclFile.exists()) {
+ loaclFile.delete();
+ }
+
+ boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
+
+ if(flag){
+ fileList.add(localFileName);
+ }else{
+ log.error("download file fail fileName="+ftpRemoteFile.getAbsFileName());
+ }
+ }
+
+ }else{
+ log.error("chdir is faill dir =["+dir+"]");
+ }
+
+ return fileList;
+ }
+
+
+} \ No newline at end of file
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThreadService.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThreadService.java
new file mode 100644
index 0000000..d75a723
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/TaskThreadService.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectMsg;
+import org.openo.nfvo.emsdriver.commons.utils.StringUtil;
+
+
+
+public class TaskThreadService extends Thread {
+
+ public Log log = LogFactory.getLog(TaskThreadService.class);
+ private final ExecutorService pool;
+
+ private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<CollectMsg>();
+ private boolean startFlag = true;
+
+ public static TaskThreadService getInstance(int poolSize) {
+ return new TaskThreadService(poolSize);
+ }
+ private TaskThreadService(int poolSize) {
+ pool = Executors.newFixedThreadPool(poolSize);
+ }
+
+ private long timeStamp = System.currentTimeMillis();
+ public void run() { // run the service
+ try {
+ while(startFlag) {
+
+ try {
+ if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
+ timeStamp = System.currentTimeMillis();
+ log.debug("task queue size " + queue.size());
+ }
+
+ CollectMsg data = receive();
+ if(data == null){
+ continue;
+ }
+
+ pool.execute(new TaskThread(data));
+
+
+ } catch (Exception e) {
+ log.error("collect task process fail!"+StringUtil.getStackTrace(e));
+ }
+
+ }
+
+ } catch (Exception ex) {
+ log.error("task ThreadService error "+StringUtil.getStackTrace(ex));
+ pool.shutdown();
+ }
+ log.error("Task ThreadService exit");
+ }
+
+
+
+ public CollectMsg receive() {
+ try {
+ return queue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("queue.poll is error"+StringUtil.getStackTrace(e));
+ }
+ return null;
+ }
+
+
+
+ public void add(CollectMsg data){
+ try {
+ queue.put(data);
+ } catch (InterruptedException e) {
+ log.error("queue.put is error"+StringUtil.getStackTrace(e));
+ }
+ }
+
+
+ public int size(){
+ return queue.size();
+ }
+
+ public void stopTask(){
+ startFlag = false;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmManager.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmManager.java
new file mode 100644
index 0000000..95cdad8
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmManager.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.model.EMSInfo;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.configmgr.ConfigurationInterface;
+
+public class AlarmManager extends DriverThread{
+
+ private ConfigurationInterface configurationInterface;
+
+ @Override
+ public void dispose() {
+ log.debug("AlarmManager is start");
+ //get alarm config
+ List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo();
+ while(isRun() && emsInfos.size() == 0){
+ emsInfos = configurationInterface.getAllEMSInfo();
+ if(emsInfos.size() == 0){
+ try {
+ Thread.sleep(1000);
+ log.debug("config is not load");
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ List<CollectVo> collectVos = new ArrayList<CollectVo>();
+ for(EMSInfo emsInfo : emsInfos){
+ //alarm
+ CollectVo CollectVo = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_ALARM);
+ if(CollectVo != null){
+ CollectVo.setEmsName(emsInfo.getName());
+ collectVos.add(CollectVo);
+ }
+ }
+
+ for(CollectVo collectVo : collectVos){
+ AlarmTaskThread alarm = new AlarmTaskThread(collectVo);
+ alarm.setName(collectVo.getIP()+collectVo.getPort());
+ alarm.start();
+ }
+
+ }
+
+ /**
+ * @return the configurationInterface
+ */
+ public ConfigurationInterface getConfigurationInterface() {
+ return configurationInterface;
+ }
+
+ /**
+ * @param configurationInterface the configurationInterface to set
+ */
+ public void setConfigurationInterface(
+ ConfigurationInterface configurationInterface) {
+ this.configurationInterface = configurationInterface;
+ }
+
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java
new file mode 100644
index 0000000..603e22b
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.utils.StringUtil;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannel;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannelFactory;
+
+import com.alibaba.fastjson.JSONObject;
+
+
+public class AlarmTaskThread extends Thread{
+ public Log log = LogFactory.getLog(AlarmTaskThread.class);
+
+ private HeartBeat heartBeat = null;
+
+ private boolean isStop = false;
+ private CollectVo collectVo = null;
+ private int read_timeout = Constant.READ_TIMEOUT_MILLISECOND;
+ private int reqId;
+
+ private Socket socket = null;
+ private BufferedInputStream is = null;
+ private BufferedOutputStream dos = null;
+
+ private MessageChannel alarmChannel;
+
+ public AlarmTaskThread(CollectVo collectVo) {
+
+ this.collectVo = collectVo;
+ }
+
+ public void run() {
+ alarmChannel = MessageChannelFactory.getMessageChannel(Constant.ALARM_CHANNEL_KEY);
+ try {
+ this.init();
+ while(!this.isStop){
+ String body;
+ try {
+ body = this.receive();
+ String alarm120 = this.build120Alarm(body);
+
+ this.send120Alarm(alarm120);
+ } catch (Exception e) {
+ reinit();
+ }
+ }
+ } catch (Exception e) {
+ log.error(StringUtil.getStackTrace(e));
+ }
+ }
+
+ private void send120Alarm(String alarm120) {
+
+ try {
+ alarmChannel.put(alarm120);
+ } catch (InterruptedException e) {
+ log.error(StringUtil.getStackTrace(e));
+ }
+ }
+
+ private String build120Alarm(String body) {
+ StringBuilder content = new StringBuilder(
+ "<?xml version='1.0' encoding='iso-8859-1'?>\n")
+ .append("<WholeMsg MsgMark='120' Priority='2' FieldNum='5'><FM_ALARM_MSG>\n");
+
+
+ JSONObject reagobj = JSONObject.parseObject(body);
+
+ Set<String> keys = reagobj.keySet();
+
+ for (String key : keys) {
+
+ String value = (String)reagobj.get(key);
+ content.append("<").append(key).append(">");
+ content.append(value);
+ content.append("</").append(key).append(">\n");
+ }
+ content.append("</FM_ALARM_MSG></WholeMsg>");
+
+ return content.toString();
+
+ }
+
+ public String receive() throws Exception {
+
+ Msg msg =null;
+ String retString = null;
+
+ while (retString == null && !this.isStop) {
+
+ msg = MessageUtil.readOneMsg(is);
+
+ if("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)){
+ log.debug("receive login ack");
+ boolean suc = this.ackLoginAlarm(msg);
+ if(suc){
+
+ if(reqId == Integer.MAX_VALUE){
+ reqId = 0;
+ }
+ reqId ++;
+ Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
+ heartBeat = new HeartBeat(socket,msgheart);
+ heartBeat.setName("CMCC_JT_HeartBeat");
+ // start heartBeat
+ heartBeat.start();
+ }
+ retString = null;
+ }
+
+ if("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)){
+ log.debug("received heartBeat��"+msg.getBody());
+ retString = null;
+ }
+
+
+
+ if("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)){
+ log.debug("received alarm message");
+ retString = msg.getBody();
+ }
+ }
+ return retString;
+ }
+
+ public void init() throws Exception {
+ isStop = false;
+ //host
+ String host = collectVo.getIP();
+ //port
+ String port = collectVo.getPort();
+ //user
+ String user = collectVo.getUser();
+ //password
+ String password = collectVo.getPassword();
+
+ String read_timeout = collectVo.getRead_timeout();
+ if ((read_timeout != null) && (read_timeout.trim().length() > 0)) {
+ try {
+ this.read_timeout = Integer.parseInt(read_timeout);
+ } catch (NumberFormatException e) {
+ log.error(StringUtil.getStackTrace(e));
+ }
+ }
+ log.debug("socket connect host=" + host + ", port=" + port);
+ try {
+ int portInt = Integer.parseInt(port);
+ socket = new Socket(host, portInt);
+
+ } catch (UnknownHostException e) {
+ throw new Exception("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
+ } catch (IOException e) {
+ throw new Exception("create socket IOException " + StringUtil.getStackTrace(e));
+ }
+ try {
+ socket.setSoTimeout(this.read_timeout);
+ socket.setTcpNoDelay(true);
+ socket.setKeepAlive(true);
+ } catch (SocketException e) {
+ throw new Exception(" SocketException " + StringUtil.getStackTrace(e));
+ }
+ try {
+ dos = new BufferedOutputStream(socket.getOutputStream());
+
+ Msg msg = MessageUtil.putLoginMsg(user,password);
+
+ try {
+ log.debug("send login message "+msg.toString(false));
+ MessageUtil.writeMsg(msg,dos);
+
+ } catch (Exception e) {
+ log.error("send login message is fail "+StringUtil.getStackTrace(e));
+ }
+
+ is = new BufferedInputStream(socket.getInputStream());
+
+ } catch (SocketException e) {
+ throw new Exception(StringUtil.getStackTrace(e));
+ }
+ }
+
+ private boolean ackLoginAlarm(Msg msg) throws Exception {
+
+ boolean is_success = false;
+ try {
+ String loginres = msg.getBody();
+ //ackLoginAlarm; result=fail(succ); resDesc=username-error
+ String [] loginbody = loginres.split(";");
+ if(loginbody.length > 1){
+ for(String str :loginbody){
+ if(str.contains("=")){
+ String [] paras1 = str.split("=",-1);
+ if("result".equalsIgnoreCase(paras1[0].trim())){
+ if("succ".equalsIgnoreCase(paras1[1].trim())){
+ is_success = true;
+ }else{
+ is_success = false;
+ }
+ }
+ }
+ }
+ }else {
+ log.error("login ack body Incorrect formatbody=" + loginres);
+ }
+
+
+ } catch (Exception e) {
+ log.error("pocess login ack fail"+StringUtil.getStackTrace(e));
+ }
+ if (is_success) {
+ log.info("login sucess receive login ack " + msg.getBody());
+ } else {
+ log.error("login fail receive login ack " + msg.getBody());
+ this.close();
+ this.isStop = true;
+ throw new Exception("login fail quit");
+ }
+ return is_success;
+ }
+
+ public void close() {
+
+ if(heartBeat != null){
+ heartBeat.setStop(true);
+ }
+
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ } finally {
+ is = null;
+ }
+ }
+
+ if (dos != null) {
+ try {
+ dos.close();
+ } catch (IOException e) {
+ } finally {
+ dos = null;
+ }
+ }
+
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ } finally {
+ socket = null;
+ }
+
+ }
+ }
+
+ public void reinit() {
+ int time = 0;
+ close();
+ while(!this.isStop) {
+ close();
+ time++;
+ try {
+ Thread.sleep(1000 * 10);
+ init();
+ return;
+ } catch (Exception e) {
+ log.error("Number ["+time+"]reconnect ["+collectVo.getIP()+"]fail" );
+ }
+ }
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/HeartBeat.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/HeartBeat.java
new file mode 100644
index 0000000..169295b
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/HeartBeat.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+
+import java.io.BufferedOutputStream;
+import java.net.Socket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+
+public class HeartBeat extends Thread{
+ public Log log = LogFactory.getLog(HeartBeat.class);
+ private BufferedOutputStream out = null;
+ private Socket socket=null;
+ private Msg heartStr ;
+ private boolean stop = false;
+ public boolean isStop(){
+ return this.stop;
+ }
+ public void setStop(boolean stop){
+ this.stop = stop;
+ }
+
+ public HeartBeat( Socket socket,Msg heatMessage){
+ this.socket=socket;
+ this.heartStr = heatMessage;
+ }
+
+ public void run(){
+ log.debug("HeartBeat start heartStr:"+heartStr.toString(false));
+ this.stop=false;
+ try {
+ while(!this.isStop()){
+ out = new BufferedOutputStream(socket.getOutputStream());
+ MessageUtil.writeMsg(heartStr,out);
+ log.debug("send HeartBeat heartStr:"+heartStr.toString(false));
+ Thread.sleep(Constant.ONEMINUTE);
+ }
+ } catch (Exception e) {
+ log.error("send HeartBeat fail ",e);
+ }
+ log.debug("HeartBeat thread stop");
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MessageUtil.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MessageUtil.java
new file mode 100644
index 0000000..dbe239e
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MessageUtil.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+
+
+public class MessageUtil
+{
+ public static String MSG_BODY_ENCODE_CHARSET="UTF-8";
+ public static int MSG_BUF_SIZE=8096 ;
+
+ public static Msg putLoginMsg(String user,String passwd)
+ {
+ String body = String.format(Msg.reqLoginAlarm, user,passwd,"msg");
+ Msg msg = new Msg(body,MsgType.reqLoginAlarm);
+
+
+ return msg;
+
+ }
+ public static Msg putLoginFtp(String user,String passwd)
+ {
+ String body = String.format(Msg.reqLoginAlarm, user,passwd,"ftp");
+ Msg msg = new Msg(body,MsgType.reqLoginAlarm);
+
+
+ return msg;
+
+ }
+
+ public static Msg putSyncMsg(int reqId,int alarmSeq)
+ {
+ String body = String.format(Msg.syncAlarmMessageMsg, reqId,alarmSeq);
+ Msg msg = new Msg(body,MsgType.reqSyncAlarmMsg);
+
+
+ return msg;
+
+ }
+
+ public static Msg putHeartBeatMsg(int reqId)
+ {
+ String body = String.format(Msg.reqHeartBeat, reqId);
+ Msg msg = new Msg(body,MsgType.reqHeartBeat);
+ return msg;
+
+ }
+
+ public static Msg reqSyncAlarmFile(int reqId, String startTime,String endTime) {
+ String body = String.format(Msg.syncActiveAlarmFileMsg, reqId,startTime,endTime);
+ Msg msg = new Msg(body,MsgType.reqSyncAlarmFile);
+ return msg;
+ }
+
+ public static Msg reqSyncAlarmFileByAlarmSeq(int reqId, int alarmSeq) {
+ String body = String.format(Msg.syncAlarmMessageByalarmSeq, reqId,alarmSeq);
+ Msg msg = new Msg(body,MsgType.reqSyncAlarmFile);
+ return msg;
+ }
+
+ public static Msg reqSyncAlarmFileByTime(int reqId, String startTime,String endTime) {
+ String body = String.format(Msg.syncAlarmFileMsg, reqId,startTime,endTime);
+ Msg msg = new Msg(body,MsgType.reqSyncAlarmFile);
+ return msg;
+ }
+
+ public static Msg closeConnAlarmMsg()
+ {
+ String body = String.format(Msg.disconnectMsg);
+ Msg msg = new Msg(body,MsgType.closeConnAlarm);
+ return msg;
+ }
+
+ public static Msg readOneMsg(BufferedInputStream is) throws Exception
+ {
+ byte[] inputB = new byte[9];
+
+ ByteArrayInputStream bais = null;
+ DataInputStream ois = null;
+
+ Msg msg = new Msg();
+ try {
+ DataInputStream dis = new DataInputStream(is);
+ dis.readFully(inputB);
+ bais = new ByteArrayInputStream(inputB);
+ ois = new DataInputStream(bais);
+ short StartSign = ois.readShort();
+ if (StartSign != Msg.StartSign) {
+ throw new Exception("start sign is [" + Msg.StartSign
+ + "],not is [" + StartSign + "]");
+ }
+ int msgType = ois.readByte();
+ msg.setMsgType(MsgType.getMsgTypeValue(msgType));
+ int timeStamp = ois.readInt();
+ msg.setTimeStamp(timeStamp);
+ int bodylength = ois.readShort();
+ msg.setLenOfBody(bodylength);
+ byte b[] = new byte[bodylength];
+ dis.readFully(b);
+ msg.newBodyfromBytes(b);
+ } catch (Exception e) {
+ throw new Exception(e);
+ }finally{
+ if(bais != null){
+ bais.close();
+ }
+ if(ois != null){
+ ois.close();
+ }
+ }
+
+ return msg;
+ }
+
+ public static void writeMsg(Msg msg,BufferedOutputStream dout) throws Exception{
+
+ ByteArrayOutputStream byteOutStream = null;
+ DataOutputStream oos = null;
+ try {
+ byteOutStream = new ByteArrayOutputStream(9);
+ oos = new DataOutputStream(byteOutStream);
+ oos.writeShort(Msg.StartSign);
+ oos.writeByte(msg.getMsgType().value);
+ oos.writeInt(Msg.creatMsgTimeStamp());
+ oos.writeShort(msg.getBodyLenNow());
+
+ dout.write(byteOutStream.toByteArray());
+
+ dout.write(msg.getBodyBytes());
+ dout.flush();
+ } catch (Exception e) {
+ throw new Exception(e);
+ }finally{
+ if(oos != null){
+ oos.close();
+ }
+ if(byteOutStream != null){
+ byteOutStream.close();
+ }
+ }
+
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/Msg.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/Msg.java
new file mode 100644
index 0000000..55b15eb
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/Msg.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+
+import java.io.UnsupportedEncodingException;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+
+
+public class Msg {
+
+ public static short StartSign = (short)0xffff;
+
+ public final static String reqLoginAlarm = "reqLoginAlarm;user=%s;key=%s;type=%s";
+
+ public final static String reqHeartBeat = "reqHeartBeat;reqId=%s";
+ public final static String disconnectMsg = "closeConnAlarm";
+
+ public final static String syncAlarmMessageMsg = "reqSyncAlarmMsg;reqID=%s;alarmSeq=%s";
+ public final static String syncAlarmMessageByalarmSeq = "reqSyncAlarmFile;reqID=%s;alarmSeq=%s;syncSource=1";
+ public final static String syncActiveAlarmFileMsg = "reqSyncAlarmFile;reqID=%s;startTime=%s;endTime=%s;syncSource=0";
+ public final static String syncAlarmFileMsg = "reqSyncAlarmFile;reqID=%s;startTime=%s;endTime=%s;syncSource=1";
+
+
+
+ private MsgType msgType;
+ private int timeStamp = 0;
+ private int lenOfBody = 0;
+ private String body = null;
+ public Msg(){}
+ public Msg(String body,MsgType msgType ){
+ this.body = body;
+ this.setMsgType(msgType);
+ }
+
+ public void newBodyfromBytes(byte b[]) throws UnsupportedEncodingException{
+ this.body = new String(b,Constant.ENCODING_UTF8);
+ }
+ public static int creatMsgTimeStamp(){
+ return (int)System.currentTimeMillis()/1000;
+ }
+
+ public int getBodyLenNow(){
+ return getBody().getBytes().length;
+ }
+
+
+ public void setTimeStamp(int timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public int getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setLenOfBody(int lenOfBody) {
+ this.lenOfBody = lenOfBody;
+ }
+
+ public int getLenOfBody() {
+ return lenOfBody;
+ }
+
+ public byte[] getBodyBytes() throws UnsupportedEncodingException {
+ return getBody().getBytes(Constant.ENCODING_UTF8);
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setMsgType(MsgType msgType) {
+ this.msgType = msgType;
+ }
+
+ public MsgType getMsgType() {
+ return msgType;
+ }
+
+ public String toString(boolean isRead){
+ StringBuilder sb = new StringBuilder();
+ sb.append("StartSign[").append(StartSign).append("]msgType[").append(msgType.value).append("]timeStamp[");
+ if(isRead){
+ sb.append(timeStamp).append("]lenOfBody[").append(lenOfBody);
+ }else{
+ sb.append(creatMsgTimeStamp()).append("]lenOfBody[").append(getBodyLenNow());
+ }
+ sb.append("]body[").append(body).append("]");
+ return sb.toString();
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MsgType.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MsgType.java
new file mode 100644
index 0000000..26aa855
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/collector/alarm/MsgType.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.collector.alarm;
+
+public enum MsgType {
+
+ reqLoginAlarm("reqLoginAlarm",1,"all"),
+ ackLoginAlarm("ackLoginAlarm",2,"all"),
+ reqSyncAlarmMsg("reqSyncAlarmMsg",3,"msg"),
+ ackSyncAlarmMsg("ackSyncAlarmMsg",4,"msg"),
+ reqSyncAlarmFile("reqSyncAlarmFile",5,"file"),
+ ackSyncAlarmFile("ackSyncAlarmFile",6,"file"),
+ ackSyncAlarmFileResult("ackSyncAlarmFileResult",7,"file"),
+ reqHeartBeat("reqHeartBeat",8,"all"),
+ ackHeartBeat("ackHeartBeat",9,"all"),
+ closeConnAlarm("closeConnAlarm",10,"all"),
+ realTimeAlarm("realTimeAlarm",0,"all"),
+ undefined("undefined",-1,"all");
+
+ public int value = -1;
+ public String name = null;
+ public String type = null;
+
+ MsgType(String name,int value,String type){this.name = name;this.value = value;this.type = type;}
+
+ public static MsgType getMsgTypeValue(int msgTypeValue){
+
+ for(MsgType msgType : MsgType.values()){
+ if(msgType.value == msgTypeValue){
+ return msgType;
+ }
+ }
+ return undefined;
+ }
+
+ public static MsgType getMsgTypeName(String msgTypeName){
+
+ for(MsgType msgType : MsgType.values()){
+ if(msgType.name.toLowerCase().equals(msgTypeName.toLowerCase())){
+ return msgType;
+ }
+ }
+ return undefined;
+ }
+
+ public String toString(){
+ return this.name;
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/constant/Constant.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/constant/Constant.java
new file mode 100644
index 0000000..5a77c73
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/constant/Constant.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.constant;
+
+import java.io.File;
+
+public class Constant {
+
+ public static String SYS_HOME = System.getenv("EMSDRIVER_HOME")==null?"E:/workspace/ems-driver/":System.getenv("EMSDRIVER_HOME");
+
+ public static String SYS_CFG = SYS_HOME + File.separator + "cfg" + File.separator;
+ public static String SYS_DATA = SYS_HOME + "data" + File.separator;
+ public static String SYS_DATA_TEMP = SYS_DATA + File.separator + "temp" + File.separator;
+ public static String SYS_DATA_RESULT = SYS_DATA + File.separator + "RESULT" + File.separator;
+ public static String COLLECT_TYPE_CM = "cm";
+ public static String COLLECT_TYPE_PM = "pm";
+ public static String COLLECT_TYPE_ALARM = "alarm";
+
+ public static String ENCODING_UTF8 = "UTF-8";
+ public static String ENCODING_GBK = "GBK";
+
+ public static final String COLLECT_CHANNEL_KEY = "COLLECT_CHANNEL_KEY";
+ public static final String COLLECT_RESULT_CHANNEL_KEY = "COLLECT_RESULT_CHANNEL_KEY";
+ public static final String ALARM_CHANNEL_KEY = "ALARM_CHANNEL_KEY";
+
+
+ public static final String MSBAPIROOTDOMAIN = "/openoapi/microservices/v1/services";
+
+ //alarm
+ public static final int READ_TIMEOUT_MILLISECOND = 180000;
+ public static final long ONEMINUTE = 60000;
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/AFtpRemoteFile.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/AFtpRemoteFile.java
new file mode 100644
index 0000000..63aa3c4
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/AFtpRemoteFile.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Date;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+
+public class AFtpRemoteFile implements RemoteFile{
+ protected FTPClient ftpClient = null;
+ protected FTPFile ftpFile = null;
+ protected String currDir = null;
+
+ public AFtpRemoteFile(FTPFile rfile, FTPClient ftpClient, String currDir)
+ throws IOException {
+ this.ftpClient = ftpClient;
+ this.ftpFile = rfile;
+ this.currDir = currDir;
+ }
+
+ public long getSize() {
+ return ftpFile.getSize();
+ }
+
+ public String getFileName() {
+ return ftpFile.getName();
+ }
+
+ public String getAbsFileName() {
+ return currDir.concat(getFileName());
+ }
+
+ public boolean isDirectory() {
+ return ftpFile.isDirectory();
+ }
+ public boolean isFile() {
+ return ftpFile.isFile();
+ }
+
+ public String getOwner() {
+ return ftpFile.getUser();
+ }
+
+ public Date getModifyDate() {
+ return ftpFile.getTimestamp().getTime();
+ }
+ public boolean renameTo(String newName) throws IOException {
+ return ftpClient.rename(
+ currDir.concat(getFileName()), newName);
+ }
+ public boolean remove() throws IOException {
+ return ftpClient.deleteFile(
+ currDir.concat(getFileName()));
+ }
+
+ public InputStream getInputStream() throws IOException {
+ return ftpClient.retrieveFileStream(this.getAbsFileName());
+ }
+
+ public void release() {
+ ftpClient = null;
+ ftpFile = null;
+ currDir = null;
+ }
+} \ No newline at end of file
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/ExtendsDefaultFTPFileEntryParserFactory.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/ExtendsDefaultFTPFileEntryParserFactory.java
new file mode 100644
index 0000000..51f68f4
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/ExtendsDefaultFTPFileEntryParserFactory.java
@@ -0,0 +1,224 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+import org.apache.commons.net.ftp.Configurable;
+import org.apache.commons.net.ftp.FTPClientConfig;
+import org.apache.commons.net.ftp.FTPFileEntryParser;
+import org.apache.commons.net.ftp.parser.CompositeFileEntryParser;
+import org.apache.commons.net.ftp.parser.DefaultFTPFileEntryParserFactory;
+import org.apache.commons.net.ftp.parser.MVSFTPEntryParser;
+import org.apache.commons.net.ftp.parser.NTFTPEntryParser;
+import org.apache.commons.net.ftp.parser.OS2FTPEntryParser;
+import org.apache.commons.net.ftp.parser.OS400FTPEntryParser;
+import org.apache.commons.net.ftp.parser.ParserInitializationException;
+import org.apache.commons.net.ftp.parser.UnixFTPEntryParser;
+import org.apache.commons.net.ftp.parser.VMSVersioningFTPEntryParser;
+ /**
+ * This is the default implementation of the
+ * FTPFileEntryParserFactory interface. This is the
+ * implementation that will be used by
+ * org.apache.commons.net.ftp.FTPClient.listFiles()
+ * if no other implementation has been specified.
+ *
+ * @see org.apache.commons.net.ftp.FTPClient#listFiles
+ * @see org.apache.commons.net.ftp.FTPClient#setParserFactory
+ */
+public class ExtendsDefaultFTPFileEntryParserFactory extends
+ DefaultFTPFileEntryParserFactory {
+
+ private FTPClientConfig config = null;
+
+ /**
+ * This default implementation of the FTPFileEntryParserFactory
+ * interface works according to the following logic:
+ * First it attempts to interpret the supplied key as a fully
+ * qualified classname of a class implementing the
+ * FTPFileEntryParser interface. If that succeeds, a parser
+ * object of this class is instantiated and is returned;
+ * otherwise it attempts to interpret the key as an identirier
+ * commonly used by the FTP SYST command to identify systems.
+ * <p/>
+ * If <code>key</code> is not recognized as a fully qualified
+ * classname known to the system, this method will then attempt
+ * to see whether it <b>contains</b> a string identifying one of
+ * the known parsers. This comparison is <b>case-insensitive</b>.
+ * The intent here is where possible, to select as keys strings
+ * which are returned by the SYST command on the systems which
+ * the corresponding parser successfully parses. This enables
+ * this factory to be used in the auto-detection system.
+ * <p/>
+ *
+ * @param key should be a fully qualified classname corresponding to
+ * a class implementing the FTPFileEntryParser interface<br/>
+ * OR<br/>
+ * a string containing (case-insensitively) one of the
+ * following keywords:
+ * <ul>
+ * <li>{@link FTPClientConfig#SYST_UNIX UNIX}</li>
+ * <li>{@link FTPClientConfig#SYST_NT WINDOWS}</li>
+ * <li>{@link FTPClientConfig#SYST_OS2 OS/2}</li>
+ * <li>{@link FTPClientConfig#SYST_OS400 OS/400}</li>
+ * <li>{@link FTPClientConfig#SYST_VMS VMS}</li>
+ * <li>{@link FTPClientConfig#SYST_MVS MVS}</li>
+ * </ul>
+ * @return the FTPFileEntryParser corresponding to the supplied key.
+ * @throws ParserInitializationException thrown if for any reason the factory cannot resolve
+ * the supplied key into an FTPFileEntryParser.
+ * @see FTPFileEntryParser
+ */
+ public FTPFileEntryParser createFileEntryParser(String key)
+ {
+ @SuppressWarnings("rawtypes")
+ Class parserClass = null;
+ FTPFileEntryParser parser = null;
+ try
+ {
+ parserClass = Class.forName(key);
+ parser = (FTPFileEntryParser) parserClass.newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ String ukey = null;
+ if (null != key)
+ {
+ ukey = key.toUpperCase();
+ }
+ if (ukey.indexOf(FTPClientConfig.SYST_UNIX) >= 0)
+ {
+ parser = createUnixFTPEntryParser();
+ }
+ else if (ukey.indexOf(FTPClientConfig.SYST_VMS) >= 0)
+ {
+ parser = createVMSVersioningFTPEntryParser();
+ }
+ else if (ukey.indexOf(FTPClientConfig.SYST_NT) >= 0 || ukey.indexOf("DOPRA") >= 0 || ukey.indexOf("MSDOS") >= 0)
+ {
+ parser = createNTFTPEntryParser();
+ }
+ else if (ukey.indexOf(FTPClientConfig.SYST_OS2) >= 0)
+ {
+ parser = createOS2FTPEntryParser();
+ }
+ else if (ukey.indexOf(FTPClientConfig.SYST_OS400) >= 0)
+ {
+ parser = createOS400FTPEntryParser();
+ }
+ else if (ukey.indexOf(FTPClientConfig.SYST_MVS) >= 0)
+ {
+ parser = createMVSEntryParser();
+ }
+ else
+ {
+ throw new ParserInitializationException("Unknown parser type: " + key);
+ }
+ }
+ catch (ClassCastException e)
+ {
+ throw new ParserInitializationException(parserClass.getName()
+ + " does not implement the interface "
+ + "org.apache.commons.net.ftp.FTPFileEntryParser.", e);
+ }
+ catch (Throwable e)
+ {
+ throw new ParserInitializationException("Error initializing parser", e);
+ }
+
+ if (parser instanceof Configurable) {
+ ((Configurable)parser).configure(this.config);
+ }
+ return parser;
+ }
+
+ /**
+ * <p>Implementation extracts a key from the supplied
+ * {@link FTPClientConfig FTPClientConfig}
+ * parameter and creates an object implementing the
+ * interface FTPFileEntryParser and uses the supplied configuration
+ * to configure it.
+ * </p><p>
+ * Note that this method will generally not be called in scenarios
+ * that call for autodetection of parser type but rather, for situations
+ * where the user knows that the server uses a non-default configuration
+ * and knows what that configuration is.
+ * </p>
+ * @param config A {@link FTPClientConfig FTPClientConfig}
+ * used to configure the parser created
+ *
+ * @return the @link FTPFileEntryParser FTPFileEntryParser} so created.
+ * @exception ParserInitializationException
+ * Thrown on any exception in instantiation
+ * @since 1.4
+ */
+ public FTPFileEntryParser createFileEntryParser(FTPClientConfig config)
+ throws ParserInitializationException
+ {
+ this.config = config;
+ String key = config.getServerSystemKey();
+ return createFileEntryParser(key);
+ }
+
+
+ public FTPFileEntryParser createUnixFTPEntryParser()
+ {
+ return (FTPFileEntryParser) new UnixFTPEntryParser();
+ }
+
+ public FTPFileEntryParser createVMSVersioningFTPEntryParser()
+ {
+ return (FTPFileEntryParser) new VMSVersioningFTPEntryParser();
+ }
+
+ public FTPFileEntryParser createNTFTPEntryParser()
+ {
+ if (config != null && FTPClientConfig.SYST_NT.equals(
+ config.getServerSystemKey()))
+ {
+ return new NTFTPEntryParser();
+ } else {
+ return new CompositeFileEntryParser(new FTPFileEntryParser[]
+ {
+ new NTFTPEntryParser(),
+ new UnixFTPEntryParser()
+ });
+ }
+ }
+
+ public FTPFileEntryParser createOS2FTPEntryParser()
+ {
+ return (FTPFileEntryParser) new OS2FTPEntryParser();
+ }
+
+ public FTPFileEntryParser createOS400FTPEntryParser()
+ {
+ if (config != null &&
+ FTPClientConfig.SYST_OS400.equals(config.getServerSystemKey()))
+ {
+ return new OS400FTPEntryParser();
+ } else {
+ return new CompositeFileEntryParser(new FTPFileEntryParser[]
+ {
+ new OS400FTPEntryParser(),
+ new UnixFTPEntryParser()
+ });
+ }
+ }
+
+ public FTPFileEntryParser createMVSEntryParser()
+ {
+ return new MVSFTPEntryParser();
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPInterface.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPInterface.java
new file mode 100644
index 0000000..b791c5c
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPInterface.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+public interface FTPInterface {
+
+ /**
+ * login ftp
+ * @throws Exception
+ */
+ public void login(String host, int port, String user, String pwd, String encode,boolean isPassiveMode, int timeout) throws Exception;
+
+ /**
+ * close ftp
+ */
+ public void logout();
+
+ /**
+ * download file
+ * @return
+ */
+ public boolean downloadFile(String remoteFile,String localFile);
+
+ public boolean chdir(String dir);
+
+ public RemoteFile[] list();
+
+ public RemoteFile[] list(String dir);
+
+ public boolean store(String localFile,String remoteFile);
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPSrv.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPSrv.java
new file mode 100644
index 0000000..1a4b6c7
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/FTPSrv.java
@@ -0,0 +1,199 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPClientConfig;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.openo.nfvo.emsdriver.commons.utils.StringUtil;
+
+
+public class FTPSrv implements FTPInterface{
+ private Log log = LogFactory.getLog(FTPSrv.class);
+ private FTPClient ftpClient = null;
+
+
+ /**
+ * login FTP
+ * @param host
+ * @param port
+ * @param user
+ * @param pwd
+ * @param encode
+ * @param timeout
+ * @throws Exception
+ */
+ public void login(String host, int port, String user, String pwd, String encode, boolean isPassiveMode,int timeout) throws Exception {
+ ftpClient = new FTPClient();
+
+ FTPClientConfig ftpClientConfig = new FTPClientConfig();
+ ftpClientConfig.setServerTimeZoneId(TimeZone.getDefault().getID());
+ this.ftpClient.setControlEncoding("GBK");
+ this.ftpClient.configure(ftpClientConfig);
+ ftpClient.setParserFactory(new ExtendsDefaultFTPFileEntryParserFactory());
+
+ if(encode!=null && encode.length()>0){
+ ftpClient.setControlEncoding(encode);
+ }
+
+ ftpClient.connect(host, port);
+ int reply = this.ftpClient.getReplyCode();
+ if (!FTPReply.isPositiveCompletion(reply)) {
+ this.ftpClient.disconnect();
+ return ;
+ }
+
+ if(!ftpClient.login(user, pwd)){
+ throw new Exception("login["+host+"],port["+port+"] fail, please check user and password");
+ }
+ if(isPassiveMode){
+ ftpClient.enterLocalPassiveMode();
+ }else{
+ ftpClient.enterLocalActiveMode();
+ }
+ ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
+ this.ftpClient.setBufferSize(1024 * 2);
+ this.ftpClient.setDataTimeout(3*60 * 1000);
+ try{
+ this.ftpClient.setSoTimeout(timeout);
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * logout
+ */
+ public void logout(){
+ if(ftpClient != null){
+ try {
+ ftpClient.logout();
+ ftpClient.disconnect();
+ }catch(Exception e){
+ }
+ ftpClient = null;
+ }
+ }
+
+
+ public boolean chdir(String dir) {
+ boolean sucess = false;
+ try {
+ if(ftpClient.changeWorkingDirectory(dir)){
+ sucess = true;
+ }else{
+ sucess = false;
+ }
+ } catch (IOException e) {
+ log.error("chdir dir ="+dir+" is error"+StringUtil.getStackTrace(e));
+ sucess = false;
+ }
+
+ return sucess;
+ }
+
+
+ public boolean downloadFile(String remoteFile, String localFile) {
+ boolean sucess = false;
+ BufferedOutputStream toLfileOutput = null;
+ try {
+ toLfileOutput = new BufferedOutputStream(new FileOutputStream(localFile));
+ ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
+ if (ftpClient.retrieveFile(remoteFile, toLfileOutput)){
+ sucess = true;
+ }else{
+ sucess = false;
+ }
+ } catch (Exception ioe) {
+ sucess = false;
+ log.error("downloadFile remoteFile ="+remoteFile +" is fail ",ioe);
+ } finally {
+ if (toLfileOutput != null)
+ try {
+ toLfileOutput.close();
+ } catch (IOException e) {
+ }
+ }
+
+ return sucess;
+ }
+
+
+ public RemoteFile[] list() {
+ AFtpRemoteFile[] ftpRemoteFiles = null;
+ String currdir = null;
+ try {
+ currdir = ftpClient.printWorkingDirectory();
+ if (currdir.endsWith("/") == false) {
+ currdir = currdir + "/";
+ }
+ FTPFile[] rfileList = null;
+ rfileList = ftpClient.listFiles(currdir);
+ ftpRemoteFiles = new AFtpRemoteFile[rfileList.length];
+ for (int i=0; i<rfileList.length; i++){
+ ftpRemoteFiles[i] = new AFtpRemoteFile(rfileList[i], ftpClient, currdir);
+ }
+ } catch (IOException e) {
+ log.error("Ftp list currdir = "+currdir+" is fail "+StringUtil.getStackTrace(e));
+ }
+ return ftpRemoteFiles;
+ }
+
+
+ public RemoteFile[] list(String dir) {
+ return null;
+ }
+
+
+ public boolean store(String localFile, String remoteFile) {
+
+ boolean sucess = false;
+ FileInputStream lfileInput = null;
+ try {
+ lfileInput = new FileInputStream(localFile);
+ ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
+
+ if (ftpClient.storeFile(remoteFile, lfileInput)){
+ sucess = true;
+ }else{
+ sucess = false;
+ }
+ } catch (Exception ioe) {
+ sucess = false;
+ log.error("store localFile = "+localFile+" is fail "+StringUtil.getStackTrace(ioe));
+ } finally {
+ if (lfileInput != null)
+ try {
+ lfileInput.close();
+ } catch (IOException e) {
+ }
+ }
+ return sucess;
+ }
+
+}
+
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/RemoteFile.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/RemoteFile.java
new file mode 100644
index 0000000..b9e1726
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/RemoteFile.java
@@ -0,0 +1,20 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+public interface RemoteFile {
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/SFTPSrv.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/SFTPSrv.java
new file mode 100644
index 0000000..60c2bc9
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/ftp/SFTPSrv.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.ftp;
+
+public class SFTPSrv implements FTPInterface{
+
+ public boolean chdir(String dir) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean downloadFile(String remoteFile, String localFile) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public RemoteFile[] list() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public RemoteFile[] list(String dir) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void login(String host, int port, String user, String pwd,
+ String encode, boolean isPassiveMode, int timeout) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void logout() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean store(String localFile, String remoteFile) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+
+}
+
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectMsg.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectMsg.java
new file mode 100644
index 0000000..f5aeb33
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectMsg.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.model;
+
+public class CollectMsg {
+
+ private long id;
+
+ private String emsName;
+
+ private String type;
+
+ /**
+ * @return the id
+ */
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * @param id the id to set
+ */
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the emsName
+ */
+ public String getEmsName() {
+ return emsName;
+ }
+
+ /**
+ * @param emsName the emsName to set
+ */
+ public void setEmsName(String emsName) {
+ this.emsName = emsName;
+ }
+
+ /**
+ * @return the type
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * @param type the type to set
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectVo.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectVo.java
new file mode 100644
index 0000000..a12249a
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/CollectVo.java
@@ -0,0 +1,281 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.model;
+
+/**
+ * @author boco
+ *
+ */
+public class CollectVo {
+
+ private String emsName;
+
+ private String type;
+
+ private String crontab;
+
+ private String IP;
+
+ private String port;
+
+ private String user;
+
+ private String password;
+
+ private String remotepath;
+
+ private String match;
+
+ private String passive;
+
+ private String ftptype;
+
+ private String granularity;
+
+ private boolean iscollect = false;
+
+ private String read_timeout;
+
+
+
+ /**
+ * @return the iscollect
+ */
+ public boolean isIscollect() {
+ return iscollect;
+ }
+
+ /**
+ * @param iscollect the iscollect to set
+ */
+ public void setIscollect(boolean iscollect) {
+ this.iscollect = iscollect;
+ }
+
+ /**
+ * @return the type
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * @param type the type to set
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * @return the crontab
+ */
+ public String getCrontab() {
+ return crontab;
+ }
+
+ /**
+ * @param crontab the crontab to set
+ */
+ public void setCrontab(String crontab) {
+ this.crontab = crontab;
+ }
+
+ /**
+ * @return the iP
+ */
+ public String getIP() {
+ return IP;
+ }
+
+ /**
+ * @param ip the iP to set
+ */
+ public void setIP(String ip) {
+ IP = ip;
+ }
+
+ /**
+ * @return the port
+ */
+ public String getPort() {
+ return port;
+ }
+
+ /**
+ * @param port the port to set
+ */
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ /**
+ * @return the user
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * @param user the user to set
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * @return the password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @param password the password to set
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * @return the remotepath
+ */
+ public String getRemotepath() {
+ return remotepath;
+ }
+
+ /**
+ * @param remotepath the remotepath to set
+ */
+ public void setRemotepath(String remotepath) {
+ this.remotepath = remotepath;
+ }
+
+ /**
+ * @return the match
+ */
+ public String getMatch() {
+ return match;
+ }
+
+ /**
+ * @param match the match to set
+ */
+ public void setMatch(String match) {
+ this.match = match;
+ }
+
+ /**
+ * @return the passive
+ */
+ public String getPassive() {
+ return passive;
+ }
+
+ /**
+ * @param passive the passive to set
+ */
+ public void setPassive(String passive) {
+ this.passive = passive;
+ }
+
+ /**
+ * @return the ftptype
+ */
+ public String getFtptype() {
+ return ftptype;
+ }
+
+ /**
+ * @param ftptype the ftptype to set
+ */
+ public void setFtptype(String ftptype) {
+ this.ftptype = ftptype;
+ }
+
+ /**
+ * @return the granularity
+ */
+ public String getGranularity() {
+ return granularity;
+ }
+
+ /**
+ * @param granularity the granularity to set
+ */
+ public void setGranularity(String granularity) {
+ this.granularity = granularity;
+ }
+
+
+ public String toString(){
+ StringBuffer sb = new StringBuffer();
+ sb.append("CollectVo:").append("\n");
+ if("alarm".equalsIgnoreCase(type)){
+ sb.append("type = ").append(type).append("\n");
+ sb.append("ip = ").append(IP).append("\n");
+ sb.append("port = ").append(port).append("\n");
+ sb.append("user = ").append(user).append("\n");
+ sb.append("password = ").append(password).append("\n");
+ sb.append("iscollect = ").append(iscollect).append("\n");
+ }else{
+ sb.append("type = ").append(type).append("\n");
+ sb.append("ip = ").append(IP).append("\n");
+ sb.append("port = ").append(port).append("\n");
+ sb.append("user = ").append(user).append("\n");
+
+ sb.append("password = ").append(password).append("\n");
+ sb.append("remotepath = ").append(remotepath).append("\n");
+ sb.append("match = ").append(match).append("\n");
+ sb.append("passive = ").append(passive).append("\n");
+ sb.append("ftptype = ").append(ftptype).append("\n");
+ sb.append("granularity = ").append(type).append("\n");
+ }
+
+
+ return sb.toString();
+
+ }
+
+ /**
+ * @return the emsName
+ */
+ public String getEmsName() {
+ return emsName;
+ }
+
+ /**
+ * @param emsName the emsName to set
+ */
+ public void setEmsName(String emsName) {
+ this.emsName = emsName;
+ }
+
+ /**
+ * @return the read_timeout
+ */
+ public String getRead_timeout() {
+ return read_timeout;
+ }
+
+ /**
+ * @param read_timeout the read_timeout to set
+ */
+ public void setRead_timeout(String read_timeout) {
+ this.read_timeout = read_timeout;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/EMSInfo.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/EMSInfo.java
new file mode 100644
index 0000000..5c12ae5
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/model/EMSInfo.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EMSInfo {
+
+ private String name;
+
+ private Map<String,CollectVo> collectMap = new HashMap<String,CollectVo>();
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public CollectVo getCollectVoByType(String type){
+ CollectVo collectVo = null;
+ collectVo = this.collectMap.get(type);
+ return collectVo;
+ }
+
+ public void putCollectMap(String type,CollectVo collectVo) {
+
+ this.collectMap.put(type, collectVo);
+ }
+
+ public String toString(){
+ StringBuffer sb = new StringBuffer();
+ sb.append("name = ").append(name).append("\n");
+ sb.append("CollectMap = ").append(collectMap);
+
+ return sb.toString();
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/DriverThread.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/DriverThread.java
new file mode 100644
index 0000000..656b549
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/DriverThread.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.utils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class DriverThread implements Runnable{
+ protected Log log = LogFactory.getLog(this.getClass());
+ private String name = null;
+ private Thread t = null;
+ private boolean run = false;
+ private boolean end = false;
+
+ public synchronized void start() {
+ t = new Thread(this);
+ t.start();
+ }
+ public void setName(String name) {
+ this.name = name;
+ if (t != null)
+ t.setName(name);
+ }
+
+ public String getName() {
+ if (t != null)
+ return t.getName();
+ return name;
+ }
+
+ public abstract void dispose();
+
+ final public void run() {
+ t = Thread.currentThread();
+ if (name != null)
+ t.setName(name);
+
+ try {
+ dispose();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ this.setEnd(true);
+
+ }
+
+ public boolean stop(){
+
+ this.setRun(false);
+ while(!isEnd()){
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ log.error("InterruptedException :"+StringUtil.getStackTrace(e));
+ }
+ }
+ return end;
+ }
+
+ public void interrupt() {
+ if (t != null)
+ t.interrupt();
+ }
+
+ public void setRun(boolean run) {
+ this.run = run;
+ }
+
+ public boolean isRun() {
+ return run;
+ }
+ public void setEnd(boolean end) {
+ this.end = end;
+ }
+ public boolean isEnd() {
+ return end;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/StringUtil.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/StringUtil.java
new file mode 100644
index 0000000..ae461d7
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/StringUtil.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.utils;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class StringUtil {
+
+ public static String getStackTrace(Throwable t){
+
+ StringWriter sw = null;
+ PrintWriter pw = null;
+ try {
+ sw = new StringWriter();
+ pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+ return sw.getBuffer().toString();
+ } catch (Exception e) {
+
+ }finally{
+ try {
+ if(pw != null) pw.close();
+ if(sw != null) sw.close();
+ } catch (Exception e2) {
+
+ }
+ }
+ return null;
+ }
+
+ public static String addSlash(String dirName) {
+ if (dirName.endsWith(File.separator))
+ return dirName;
+ return dirName + File.separator;
+ }
+
+ public static boolean isBank(String str){
+
+ if(str == null || str.trim().length() == 0){
+
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/UnZip.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/UnZip.java
new file mode 100644
index 0000000..cbe946e
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/UnZip.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+
+import org.apache.tools.zip.ZipEntry;
+import org.apache.tools.zip.ZipFile;
+
+
+public class UnZip {
+ protected String deCompressPath = null;
+ protected String zipFilePath = null;
+
+ /**
+ *
+ */
+ public UnZip(String zipFilePath, String toPath) throws IOException {
+ File zipFile = new File(
+ new File(zipFilePath).getAbsolutePath());
+ if (!zipFile.isFile())
+ throw new IOException("not found file '"+zipFilePath+"'");
+
+ this.deCompressPath = toPath;
+ this.zipFilePath = zipFile.getAbsolutePath();
+
+ if (deCompressPath == null)
+ deCompressPath = zipFile.getParent() + File.separator;
+
+ else if (deCompressPath.charAt(deCompressPath.length()-1) != '/')
+ deCompressPath = deCompressPath + File.separator;
+ }
+
+ /**
+ *
+ */
+ public void deCompress() throws IOException {
+ ZipFile zipFile = new ZipFile(zipFilePath);
+ try{
+ Enumeration<ZipEntry> e = zipFile.getEntries();
+ for (ZipEntry entry; e.hasMoreElements(); ) {
+ if (!(entry=e.nextElement()).isDirectory()) {
+ String toPath = new StringBuffer(
+ deCompressPath).append(entry.getName()).toString();
+ toPath = toPath.replace("\\", File.separator);
+ deCompressFile(zipFile.getInputStream(entry), toPath);
+ }
+ }
+ }catch(IOException e){
+ throw e;
+ }finally{
+ zipFile.close();
+ }
+ }
+
+ /**
+ *
+ */
+ protected void deCompressFile(InputStream input, String toPath)
+ throws IOException {
+ byte byteBuf[] = new byte[2048];
+ String path = new File(toPath).getParent();
+ if(!new File(path).exists()){
+ new File(path).mkdirs();
+ }
+ FileOutputStream output = new FileOutputStream(toPath, false);
+ try{
+ for (int count=0; (count=input.read(byteBuf,0,byteBuf.length))!=-1;)
+ output.write(byteBuf, 0, count);
+ }catch(IOException e){
+ throw e;
+ }finally{
+ output.close();
+ input.close();
+ }
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/XmlUtil.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/XmlUtil.java
new file mode 100644
index 0000000..ef941a7
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/XmlUtil.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.jdom.Document;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+
+
+public class XmlUtil {
+
+ public static Document getDocument(InputStream is) throws XMLStreamException, JDOMException, IOException{
+
+ SAXBuilder builder = new SAXBuilder();
+ Document doc = builder.build(is);
+ return doc;
+ }
+ public static Document getDocument(Reader reader) throws XMLStreamException, JDOMException, IOException{
+ SAXBuilder builder = new SAXBuilder();
+ Document doc = builder.build(reader);
+ return doc;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/Zip.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/Zip.java
new file mode 100644
index 0000000..4aafe59
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/commons/utils/Zip.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.commons.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+
+public class Zip {
+ protected int compressDirectoryCount = 0;
+ protected int compressFileCount = 0;
+
+ protected int relativeAddrIdx = 0;
+ protected int compressLevel = 6;
+ protected String zipFilePath = null;
+ protected String compressPath = null;
+
+ protected ZipOutputStream zipOutput = null;
+
+ /**
+ *
+ */
+ public Zip(String compressPath, String zipFilePath) throws IOException{
+ File compressFile = new File(compressPath);
+ if (!compressFile.exists())
+ throw new IOException("the file or directory '"+compressPath+"' not found!");
+
+ this.zipFilePath = zipFilePath;
+ this.compressPath = compressFile.getAbsolutePath();
+
+ if (this.zipFilePath == null) {
+ StringBuffer zipFilePathBuf = new StringBuffer(this.compressPath);
+ int bufLen = zipFilePathBuf.length();
+ if (zipFilePathBuf.charAt(bufLen-1) == '/')
+ zipFilePathBuf.deleteCharAt(bufLen-1);
+ this.zipFilePath = zipFilePathBuf.append(".zip").toString();
+ }
+ relativeAddrIdx = this.compressPath.lastIndexOf(File.separator)+1;
+ }
+
+ /**
+ *
+ */
+ public void compress() throws IOException {
+ File theFile = new File(zipFilePath);
+
+ if (!theFile.exists()) {
+ String parentPath = theFile.getParent();
+ if (parentPath != null)
+ new File(parentPath).mkdirs();
+ theFile.createNewFile();
+ }
+ zipOutput = new ZipOutputStream(new FileOutputStream(zipFilePath));
+ zipOutput.setMethod(ZipOutputStream.DEFLATED);
+ zipOutput.setLevel(compressLevel);
+ compressDirectory(new File(compressPath));
+ zipOutput.close();
+ }
+
+ protected void compressDirectory(File directoryPath) throws IOException {
+ if (directoryPath.isFile()) {
+ compressFile(directoryPath.getAbsolutePath());
+ }else{
+ File listFiles[] = directoryPath.listFiles();
+ for (int i=0; i<listFiles.length; i++)
+ if (listFiles[i].isFile()) {
+ compressFile(listFiles[i].getAbsolutePath());
+ }else {
+ compressDirectoryCount ++;
+ compressDirectory(listFiles[i]);
+ }
+ }
+
+
+ }
+ protected void compressFile(String absolutePath) throws IOException {
+ compressFileCount ++;
+ byte byteBuf[] = new byte[2048];
+ zipOutput.putNextEntry(new ZipEntry(absolutePath.substring(relativeAddrIdx)));
+
+ FileInputStream input= new FileInputStream(absolutePath);
+ for (int count=0; (count=input.read(byteBuf,0,byteBuf.length))!=-1;)
+ zipOutput.write(byteBuf, 0, count);
+ input.close();
+ zipOutput.closeEntry();
+ }
+
+ public void setCompressLevel(int level) {
+ compressLevel = level;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationImp.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationImp.java
new file mode 100644
index 0000000..05f40c2
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationImp.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.configmgr;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.model.EMSInfo;
+
+
+public class ConfigurationImp implements ConfigurationInterface{
+
+ private Log log = LogFactory.getLog(ConfigurationImp.class);
+
+ @Override
+ public List<EMSInfo> getAllEMSInfo() {
+ List<EMSInfo> emsInfos = ConfigurationManager.getAllEMSInfos();
+ return emsInfos;
+ }
+
+ @Override
+ public CollectVo getCollectVoByEmsNameAndType(String emsName, String type) {
+ CollectVo collectVo = null;
+
+ EMSInfo emsInfo = ConfigurationManager.getEMSInfoByName(emsName);
+ if(emsInfo != null){
+ collectVo = emsInfo.getCollectVoByType(type);
+ }else{
+ log.error("ConfigurationManager.getEMSInfoByName return null");
+ }
+ return collectVo;
+ }
+
+ @Override
+ public Properties getProperties() {
+ Properties p = ConfigurationManager.getProperties();
+ return p;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationInterface.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationInterface.java
new file mode 100644
index 0000000..5087739
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationInterface.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.configmgr;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.model.EMSInfo;
+
+
+public interface ConfigurationInterface {
+
+ public List<EMSInfo> getAllEMSInfo();
+ public CollectVo getCollectVoByEmsNameAndType(String emsName,String type);
+
+ public Properties getProperties();
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationManager.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationManager.java
new file mode 100644
index 0000000..5544dce
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/configmgr/ConfigurationManager.java
@@ -0,0 +1,184 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.configmgr;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jdom.Document;
+import org.jdom.Element;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.model.EMSInfo;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.commons.utils.StringUtil;
+import org.openo.nfvo.emsdriver.commons.utils.XmlUtil;
+
+
+public class ConfigurationManager extends DriverThread{
+
+ /**
+ * ESM Cache
+ */
+ private static Map<String, EMSInfo> emsInfoCache = new ConcurrentHashMap<String, EMSInfo>();
+
+ private static Properties properties = null;
+
+ private final static String ftpconfig = Constant.SYS_CFG + "ftpconfig.properties";
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void dispose() {
+ String path = Constant.SYS_CFG + "EMSInfo.xml";
+ File cfg = new File(path);
+ this.log.debug("start loading " + path);
+ if(!cfg.exists() || !cfg.isFile()){
+ this.log.debug("not exists " + path);
+ return;
+ }
+
+ InputStream is = null;
+ Map<String, EMSInfo> tmpcache = new HashMap<String, EMSInfo>();
+
+ try {
+ is = new FileInputStream(cfg);
+ Document doc = XmlUtil.getDocument(is);
+
+ Element root = doc.getRootElement();
+
+ List<Element> children = root.getChildren();
+
+ for(Iterator<Element> it = children.iterator();it.hasNext();){
+ EMSInfo emsInfo = new EMSInfo();
+ Element child = it.next();
+ String name = child.getAttributeValue("name");
+ if(StringUtil.isBank(name)){
+ continue;
+ }
+ emsInfo.setName(name);
+
+ tmpcache.put(name, emsInfo);
+
+ List<Element> collectList = child.getChildren();
+ for(Element collect : collectList){
+
+ CollectVo collectVo = new CollectVo();
+
+ String type = collect.getAttributeValue("type");
+ if("alarm".equalsIgnoreCase(type)){
+ boolean iscollect = Boolean.parseBoolean(collect.getAttributeValue("iscollect"));
+ if(iscollect){
+ collectVo.setIscollect(iscollect);
+ }else{
+ continue;
+ }
+ collectVo.setType(type);
+ collectVo.setIP(collect.getChildText("ip"));
+ collectVo.setPort(collect.getChildText("port"));
+ collectVo.setUser(collect.getChildText("user"));
+ collectVo.setPassword(collect.getChildText("password"));
+ collectVo.setRead_timeout(collect.getChildText("readtimeout"));
+ }else{
+ String crontab = collect.getAttributeValue("crontab");
+ if(!StringUtil.isBank(type) && !StringUtil.isBank(crontab)){
+ collectVo.setType(type);
+ collectVo.setCrontab(crontab);
+ }else{
+ continue;
+ }
+ collectVo.setIP(collect.getChildText("ip"));
+ collectVo.setPort(collect.getChildText("port"));
+ collectVo.setUser(collect.getChildText("user"));
+ collectVo.setPassword(collect.getChildText("password"));
+ collectVo.setRemotepath(collect.getChildText("remotepath"));
+ collectVo.setMatch(collect.getChildText("match"));
+ collectVo.setPassive(collect.getChildText("passive"));
+ collectVo.setFtptype(collect.getChildText("ftptype"));
+ collectVo.setGranularity(collect.getChildText("granularity"));
+ }
+
+ emsInfo.putCollectMap(type, collectVo);
+ }
+ tmpcache.put(name, emsInfo);
+ }
+ emsInfoCache.putAll(tmpcache);
+
+ } catch (Exception e) {
+ log.error("load EMSInfo.xml is error "+StringUtil.getStackTrace(e));
+ }finally{
+ tmpcache.clear();
+ try {
+ if(is != null){
+ is.close();
+ is = null;
+ }
+ } catch (Exception e2) {
+ }
+ cfg = null;
+ }
+
+
+ //this.log.debug("start loading " + cacheFilePath);
+ File file = new File(ftpconfig);
+ if(!file.exists() || !file.isFile()){
+ this.log.error("cacheFilePath " + ftpconfig+"not exist or is not File");
+ return;
+ }
+ InputStream in = null;
+ try{
+ properties = new Properties();
+ in = new FileInputStream(file);
+ properties.load(in);
+
+ }catch(Exception e) {
+ log.error("read ["+file.getAbsolutePath()+"]Exception :",e);
+ }finally {
+ if(in != null) {
+ try {
+ in.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ public static synchronized List<EMSInfo> getAllEMSInfos(){
+ List<EMSInfo> list = new ArrayList<EMSInfo>();
+ for(EMSInfo emsinfo :emsInfoCache.values()){
+ list.add(emsinfo);
+ }
+ return list;
+ }
+
+ public static synchronized EMSInfo getEMSInfoByName(String emsName){
+ EMSInfo emsInfo= emsInfoCache.get(emsName);
+ return emsInfo;
+ }
+
+ public static synchronized Properties getProperties() {
+ return properties;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannel.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannel.java
new file mode 100644
index 0000000..58d2cb2
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannel.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.messagemgr;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+public class MessageChannel {
+
+ private BlockingQueue<Object> queue = null;
+
+ public MessageChannel(int size){
+ if(size>0){
+ queue = new LinkedBlockingQueue<Object>(size);
+ }else{
+ queue = new LinkedBlockingQueue<Object>();
+ }
+ }
+
+ public MessageChannel(){
+ queue = new LinkedBlockingQueue<Object>();
+ }
+ public void put(Object msg) throws InterruptedException{
+ while(!queue.offer(msg)){
+ queue.poll();
+ }
+ }
+
+ public Object get() throws InterruptedException{
+ return queue.take();
+ }
+
+ public Object poll() throws InterruptedException{
+ return queue.poll(100, TimeUnit.MILLISECONDS);
+ }
+
+ public int size(){
+ return queue.size();
+ }
+
+ public void clear(){
+ queue.clear();
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannelFactory.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannelFactory.java
new file mode 100644
index 0000000..7268ae2
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/messagemgr/MessageChannelFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.messagemgr;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessageChannelFactory {
+
+ public static Map<String, MessageChannel> map = new HashMap<String, MessageChannel>();
+
+ public synchronized static MessageChannel getMessageChannel(String key,Integer size){
+ if(map.get(key) != null){
+ return map.get(key);
+ }
+ MessageChannel mc = null;
+ if(size != null && size > 0){
+ mc = new MessageChannel(size);
+ }else{
+ mc = new MessageChannel();
+ }
+
+ map.put(key, mc);
+ return mc;
+ }
+
+ public synchronized static MessageChannel getMessageChannel(String key){
+ if(map.get(key) != null){
+ return map.get(key);
+ }
+ MessageChannel mc = new MessageChannel();
+
+ map.put(key, mc);
+ return mc;
+ }
+
+ public synchronized static boolean destroyMessageChannel(String key){
+ if(map.get(key) != null){
+ map.remove(key);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized static void clean(){
+ map.clear();
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientFactory.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientFactory.java
new file mode 100644
index 0000000..8af94e4
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.client;
+
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+
+/**
+ * HttpClient
+ */
+public class HttpClientFactory{
+
+
+ public static CloseableHttpClient getSSLClientFactory() throws Exception {
+
+ SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+ //信任所有
+ public boolean isTrusted(X509Certificate[] chain,
+ String authType) throws CertificateException {
+ return true;
+ }
+ }).build();
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);
+ CloseableHttpClient httpclient = HttpClients.custom().setSSLSocketFactory(sslsf).build();
+
+ return httpclient;
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientUtil.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientUtil.java
new file mode 100644
index 0000000..1fdd731
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/HttpClientUtil.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.client;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+/*
+ * HttpClient post request
+ */
+public class HttpClientUtil {
+
+ private static Log log = LogFactory.getLog(HttpClientUtil.class);
+
+ public static String doPost(String url,String json,String charset){
+ CloseableHttpClient httpClient = null;
+ HttpPost httpPost = null;
+ String result = null;
+ try{
+ httpClient = HttpClientFactory.getSSLClientFactory();
+ httpPost = new HttpPost(url);
+ if (null != json) {
+ StringEntity s = new StringEntity(json);
+ s.setContentEncoding("UTF-8");
+ s.setContentType("application/json"); // set contentType
+ httpPost.setEntity(s);
+ }
+ CloseableHttpResponse response = httpClient.execute(httpPost);
+ try {
+ if(response != null){
+ HttpEntity resEntity = response.getEntity();
+ if(resEntity != null){
+ result = EntityUtils.toString(resEntity,charset);
+ }
+ }
+ } catch (Exception e) {
+ log.error("httpClient.execute(httpPost) is fail",e);
+ }finally{
+ if(response != null){
+ response.close();
+ }
+ }
+ }catch(Exception e){
+ log.error("doPost is fail ",e);
+ }finally{
+ if(httpClient != null){
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ }
+ }
+
+ }
+ return result;
+ }
+
+ public static String doDelete(String url ,String charset){
+ CloseableHttpClient httpClient = null;
+ HttpDelete httpDelete = null;
+ String result = null;
+ try{
+ httpClient = HttpClientFactory.getSSLClientFactory();
+ httpDelete = new HttpDelete(url);
+
+ CloseableHttpResponse response = httpClient.execute(httpDelete);
+
+ try {
+ if(response != null){
+ HttpEntity resEntity = response.getEntity();
+ if(resEntity != null){
+ result = EntityUtils.toString(resEntity,charset);
+ }
+ }
+ } catch (Exception e) {
+ log.error("",e);
+ }finally{
+ if(response != null){
+ response.close();
+ }
+ }
+ }catch(Exception e){
+ log.error("doDelete is fail ",e);
+ }finally{
+ if(httpClient != null){
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ return result;
+ }
+
+ public static String doGet(String url, String charset){
+ CloseableHttpClient httpClient = null;
+ HttpGet httpGet = null;
+ String result = null;
+ try{
+ httpClient = HttpClientFactory.getSSLClientFactory();
+ httpGet = new HttpGet(url);
+
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+
+ try {
+ if(response != null){
+ HttpEntity resEntity = response.getEntity();
+ if(resEntity != null){
+ result = EntityUtils.toString(resEntity,charset);
+ }
+ }
+ } catch (Exception e) {
+ log.error("",e);
+ }finally{
+ if(response != null){
+ response.close();
+ }
+ }
+ }catch(Exception e){
+ log.error("doGet is fail ",e);
+ }finally{
+ if(httpClient != null){
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ return result;
+ }
+
+} \ No newline at end of file
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/NorthMessageMgr.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/NorthMessageMgr.java
new file mode 100644
index 0000000..211badf
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/client/NorthMessageMgr.java
@@ -0,0 +1,125 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.client;
+
+import java.util.Properties;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.configmgr.ConfigurationInterface;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannel;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannelFactory;
+
+public class NorthMessageMgr extends DriverThread{
+
+ private MessageChannel alarmChannel = MessageChannelFactory.getMessageChannel(Constant.ALARM_CHANNEL_KEY);
+ private MessageChannel collectResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
+ private ConfigurationInterface configurationInterface ;
+
+ private boolean threadStop = false;
+
+ @Override
+ public void dispose() {
+ //
+ new AlarmMessageRecv().start();
+
+ new ResultMessageRecv().start();
+ }
+
+
+ class AlarmMessageRecv extends Thread{
+ long timeStamp = System.currentTimeMillis();
+
+ public void run() {
+ while(!threadStop){
+
+ try {
+ if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
+ timeStamp = System.currentTimeMillis();
+
+ log.debug("ALARM_CHANNEL Msg size :"+alarmChannel.size());
+ }
+
+ Object obj = alarmChannel.poll();
+ if(obj == null){
+ continue;
+ }
+ if(obj instanceof String){
+ //http
+ Properties properties = configurationInterface.getProperties();
+ String msbAddress = properties.getProperty("msbAddress");
+ String url = properties.getProperty("alarm");
+ String postUrl = "http://"+msbAddress+url;
+
+ HttpClientUtil.doPost(postUrl, (String)obj, Constant.ENCODING_UTF8);
+ }
+
+ } catch (Exception e) {
+ log.error("AlarmMessageRecv exception",e);
+ }
+ }
+ }
+ }
+
+ class ResultMessageRecv extends Thread{
+ long timeStamp = System.currentTimeMillis();
+
+ public void run() {
+ while(!threadStop){
+
+ try {
+ if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
+ timeStamp = System.currentTimeMillis();
+
+ log.debug("COLLECT_RESULT_CHANNEL Msg size :"+collectResultChannel.size());
+ }
+
+ Object obj = collectResultChannel.poll();
+ if(obj == null){
+ continue;
+ }
+ if(obj instanceof String){
+ //http
+ Properties properties = configurationInterface.getProperties();
+ String msbAddress = properties.getProperty("msbAddress");
+ String url = properties.getProperty("dataNotifyUrl");
+ String postUrl = "http://"+msbAddress+url;
+ HttpClientUtil.doPost(postUrl, (String)obj, Constant.ENCODING_UTF8);
+ }
+
+ } catch (Exception e) {
+ log.error("AlarmMessageRecv exception",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @return the configurationInterface
+ */
+ public ConfigurationInterface getConfigurationInterface() {
+ return configurationInterface;
+ }
+
+ /**
+ * @param configurationInterface the configurationInterface to set
+ */
+ public void setConfigurationInterface(
+ ConfigurationInterface configurationInterface) {
+ this.configurationInterface = configurationInterface;
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/CommandResource.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/CommandResource.java
new file mode 100644
index 0000000..914f185
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/CommandResource.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.service;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import com.codahale.metrics.annotation.Timed;
+
+@Path("/ems-driver")
+@Produces(MediaType.APPLICATION_JSON)
+public class CommandResource {
+
+
+ @GET
+ @Timed
+ public String executeCommand(@QueryParam("command") String command) {
+
+ System.out.println("receiver command = "+command);
+ return command;
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverApplication.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverApplication.java
new file mode 100644
index 0000000..8df94a7
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverApplication.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.service;
+
+import io.dropwizard.Application;
+import io.dropwizard.jetty.HttpConnectorFactory;
+import io.dropwizard.server.DefaultServerFactory;
+import io.dropwizard.setup.Bootstrap;
+import io.dropwizard.setup.Environment;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.serviceregister.MsbConfiguration;
+import org.openo.nfvo.emsdriver.serviceregister.MsbRestServiceProxy;
+import org.openo.nfvo.emsdriver.serviceregister.model.MsbRegisterVo;
+import org.openo.nfvo.emsdriver.serviceregister.model.ServiceNodeVo;
+
+public class EmsDriverApplication extends Application<EmsDriverConfiguration> {
+
+ protected static Log log = LogFactory.getLog(EmsDriverApplication.class);
+
+ public static void main(String[] args) throws Exception {
+ new EmsDriverApplication().run(args);
+ }
+
+ @Override
+ public String getName() {
+ return "ems-driver";
+ }
+
+ @Override
+ public void initialize(Bootstrap<EmsDriverConfiguration> bootstrap) {
+ // nothing to do yet
+ }
+
+ @Override
+ public void run(EmsDriverConfiguration configuration,Environment environment) {
+ // register CommandResource
+ environment.jersey().register(new CommandResource());
+
+
+ MsbConfiguration.setMsbAddress(configuration.getMsbAddress());
+ //MSB register
+ this.msbRegisteEmsDriverService(configuration);
+ }
+
+ private void msbRegisteEmsDriverService(EmsDriverConfiguration configuration) {
+ DefaultServerFactory defaultServerFactory = (DefaultServerFactory)configuration.getServerFactory();
+ HttpConnectorFactory connector = (HttpConnectorFactory)defaultServerFactory.getAdminConnectors().get(0);
+ MsbRegisterVo registerVo = new MsbRegisterVo();
+ ServiceNodeVo serviceNode = new ServiceNodeVo();
+ String ip = "";
+ try {
+ ip = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ log.error("Unable to get host ip: " + e.getMessage());
+ }
+ if(ip.equals("")){
+ ip = connector.getBindHost();
+ }
+ serviceNode.setIp(ip);
+ serviceNode.setPort(String.valueOf(connector.getPort()));
+ serviceNode.setTtl(0);
+
+ List<ServiceNodeVo> nodeList = new ArrayList<ServiceNodeVo>();
+ nodeList.add(serviceNode);
+ registerVo.setServiceName("emsdriver");
+ registerVo.setUrl("/openoapi/emsdriver/v1");
+ registerVo.setNodes(nodeList);
+
+ MsbRestServiceProxy.registerService(registerVo);
+ log.info("register monitor-umc service to msb finished.");
+
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverConfiguration.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverConfiguration.java
new file mode 100644
index 0000000..af3788f
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/northbound/service/EmsDriverConfiguration.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.northbound.service;
+
+import io.dropwizard.Configuration;
+
+import org.hibernate.validator.constraints.NotEmpty;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class EmsDriverConfiguration extends Configuration{
+
+ @NotEmpty
+ private String msbAddress;
+
+ @NotEmpty
+ private String defaultName = "EmsDriver-Stranger";
+
+ @JsonProperty
+ public String getMsbAddress() {
+ return msbAddress;
+ }
+
+ @JsonProperty
+ public void setMsbAddress(String msbAddress) {
+ this.msbAddress = msbAddress;
+ }
+
+ @JsonProperty
+ public String getDefaultName() {
+ return defaultName;
+ }
+
+ @JsonProperty
+ public void setDefaultName(String name) {
+ this.defaultName = name;
+ }
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbConfiguration.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbConfiguration.java
new file mode 100644
index 0000000..2aab0be
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbConfiguration.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.serviceregister;
+
+public class MsbConfiguration {
+ protected static String msbAddress;
+
+ public static String getMsbAddress() {
+ return msbAddress;
+ }
+
+ public static void setMsbAddress(String msbAddress) {
+ MsbConfiguration.msbAddress ="http://"+ msbAddress;
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbRestServiceProxy.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbRestServiceProxy.java
new file mode 100644
index 0000000..8d6273e
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/MsbRestServiceProxy.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.serviceregister;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.northbound.client.HttpClientUtil;
+import org.openo.nfvo.emsdriver.serviceregister.model.MsbRegisterVo;
+import org.openo.nfvo.emsdriver.serviceregister.model.ServiceNodeVo;
+
+import com.alibaba.fastjson.JSON;
+
+public class MsbRestServiceProxy {
+
+ public static String registerService(MsbRegisterVo registerInfo){
+ String url = MsbConfiguration.getMsbAddress()+Constant.MSBAPIROOTDOMAIN;
+ String registerObj = JSON.toJSONString(registerInfo);
+
+ String registerResponse = HttpClientUtil.doPost(url, registerObj, Constant.ENCODING_UTF8);
+ return registerResponse;
+ }
+
+ public static void unRegiserService(String serviceName,String version,String ip,String port){
+ String url = MsbConfiguration.getMsbAddress()+Constant.MSBAPIROOTDOMAIN+"/"+serviceName+"/version/"+version+"/nodes/"+ip+"/"+port;
+ HttpClientUtil.doDelete(url, Constant.ENCODING_UTF8);
+ }
+
+ public static List<String> queryService(String serviceName,String version){
+ List<String> ipList = new ArrayList<String>();
+ String url = MsbConfiguration.getMsbAddress()+Constant.MSBAPIROOTDOMAIN+"/"+serviceName+"/version/"+version;
+ String response = HttpClientUtil.doGet(url, Constant.ENCODING_UTF8);
+ if(!response.equals("")){
+ MsbRegisterVo msbRegisterVo = JSON.parseObject(response,MsbRegisterVo.class);
+ List<ServiceNodeVo> nodeList = msbRegisterVo.getNodes();
+
+ for(ServiceNodeVo node :nodeList){
+ String ip = node.getIp();
+ ipList.add(ip);
+ }
+ }
+ return ipList;
+
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/MsbRegisterVo.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/MsbRegisterVo.java
new file mode 100644
index 0000000..8fc1d39
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/MsbRegisterVo.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.serviceregister.model;
+
+import java.util.List;
+
+
+public class MsbRegisterVo {
+ private String serviceName = "emsdriver";
+ private String version = "v1";
+ private String url = "/openoapi/emsdriver/v1";
+ private String protocol = "REST";
+ private String visualRange = "1";
+ private List<ServiceNodeVo> nodes;
+
+
+ /**
+ * @return the serviceName
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+ /**
+ * @param serviceName the serviceName to set
+ */
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+ /**
+ * @return the version
+ */
+ public String getVersion() {
+ return version;
+ }
+ /**
+ * @param version the version to set
+ */
+ public void setVersion(String version) {
+ this.version = version;
+ }
+ /**
+ * @return the url
+ */
+ public String getUrl() {
+ return url;
+ }
+ /**
+ * @param url the url to set
+ */
+ public void setUrl(String url) {
+ this.url = url;
+ }
+ /**
+ * @return the protocol
+ */
+ public String getProtocol() {
+ return protocol;
+ }
+ /**
+ * @param protocol the protocol to set
+ */
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+ /**
+ * @return the visualRange
+ */
+ public String getVisualRange() {
+ return visualRange;
+ }
+ /**
+ * @param visualRange the visualRange to set
+ */
+ public void setVisualRange(String visualRange) {
+ this.visualRange = visualRange;
+ }
+ /**
+ * @return the nodes
+ */
+ public List<ServiceNodeVo> getNodes() {
+ return nodes;
+ }
+ /**
+ * @param nodes the nodes to set
+ */
+ public void setNodes(List<ServiceNodeVo> nodes) {
+ this.nodes = nodes;
+ }
+
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/ServiceNodeVo.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/ServiceNodeVo.java
new file mode 100644
index 0000000..a491c00
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/serviceregister/model/ServiceNodeVo.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.serviceregister.model;
+
+
+public class ServiceNodeVo {
+ private String ip;
+ private String port;
+ private int ttl;
+
+ /**
+ * @return the ip
+ */
+ public String getIp() {
+ return ip;
+ }
+ /**
+ * @param ip the ip to set
+ */
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+ /**
+ * @return the port
+ */
+ public String getPort() {
+ return port;
+ }
+ /**
+ * @param port the port to set
+ */
+ public void setPort(String port) {
+ this.port = port;
+ }
+ /**
+ * @return the ttl
+ */
+ public int getTtl() {
+ return ttl;
+ }
+ /**
+ * @param ttl the ttl to set
+ */
+ public void setTtl(int ttl) {
+ this.ttl = ttl;
+ }
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectManager.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectManager.java
new file mode 100644
index 0000000..6d2acca
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectManager.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.taskscheduler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.model.EMSInfo;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.configmgr.ConfigurationInterface;
+import org.quartz.Job;
+
+
+public class CollectManager extends DriverThread{
+
+ private ConfigurationInterface configurationInterface;
+ public void dispose() {
+ if(configurationInterface != null){
+ List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo();
+ while(isRun() && emsInfos.size() == 0){
+
+ emsInfos = configurationInterface.getAllEMSInfo();
+ if(emsInfos.size() == 0){
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ List<CollectVo> collectVos = new ArrayList<CollectVo>();
+ for(EMSInfo emsInfo : emsInfos){
+ //cm
+ CollectVo CollectVoCm = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_CM);
+ if(CollectVoCm != null){
+ CollectVoCm.setEmsName(emsInfo.getName());
+ collectVos.add(CollectVoCm);
+ }
+
+ //pm
+ CollectVo CollectVoPm = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_PM);
+ if(CollectVoPm != null){
+ CollectVoPm.setEmsName(emsInfo.getName());
+ collectVos.add(CollectVoPm);
+ }
+
+ }
+ this.addCollectJob(collectVos);
+ log.info("addCollectJob is OK ");
+ }else{
+ log.error("configurationInterface = null,check spring.xml");
+ }
+ }
+
+ private void addCollectJob(List<CollectVo> collectVos) {
+ for(CollectVo collectVo : collectVos){
+ try {
+ String jobName = collectVo.getEmsName()+"_"+collectVo.getType()+collectVo.getIP();
+ Job job = new CollectOderJob();
+ String jobClass = job.getClass().getName();
+ String time = collectVo.getCrontab();
+ if(time != null && !"".equals(time)){
+ QuartzManager.addJob(jobName, jobClass, time,collectVo);
+ }else{
+ log.error("type =["+collectVo.getType()+"]ip=["+collectVo.getIP()+"] crontab is null,check EMSInfo.xml");
+ }
+
+ } catch (Exception e) {
+ log.error("addJob is error",e);
+ }
+ }
+ }
+
+ /**
+ * @return the configurationInterface
+ */
+ public ConfigurationInterface getConfigurationInterface() {
+ return configurationInterface;
+ }
+
+ /**
+ * @param configurationInterface the configurationInterface to set
+ */
+ public void setConfigurationInterface(
+ ConfigurationInterface configurationInterface) {
+ this.configurationInterface = configurationInterface;
+ }
+
+
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectOderJob.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectOderJob.java
new file mode 100644
index 0000000..682784a
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/CollectOderJob.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.taskscheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.constant.Constant;
+import org.openo.nfvo.emsdriver.commons.model.CollectMsg;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.openo.nfvo.emsdriver.commons.utils.DriverThread;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannel;
+import org.openo.nfvo.emsdriver.messagemgr.MessageChannelFactory;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+
+public class CollectOderJob implements Job {
+
+ private Log log = LogFactory.getLog(DriverThread.class);
+ private MessageChannel collectChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
+
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ // TODO Auto-generated method stub
+ CollectVo collectVo = (CollectVo)context.getJobDetail().getJobDataMap().get("collectVo");
+ if(collectVo != null){
+ CollectMsg collectMsg = new CollectMsg();
+ collectMsg.setEmsName(collectVo.getEmsName());
+ collectMsg.setId(System.nanoTime());
+ collectMsg.setType(collectVo.getType());
+
+ try {
+ collectChannel.put(collectMsg);
+ } catch (InterruptedException e) {
+ log.error("collectChannel.put is error ",e);
+ }
+ }else{
+ log.error("collectVo is null, collectMsg is not created! ");
+ }
+
+
+ }
+
+}
diff --git a/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/QuartzManager.java b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/QuartzManager.java
new file mode 100644
index 0000000..ce3222f
--- /dev/null
+++ b/ems/sems/boco/ems-driver/src/main/java/org/openo/nfvo/emsdriver/taskscheduler/QuartzManager.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2017 BOCO Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.nfvo.emsdriver.taskscheduler;
+
+
+import java.text.ParseException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.openo.nfvo.emsdriver.commons.model.CollectVo;
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SchedulerFactory;
+import org.quartz.impl.StdSchedulerFactory;
+
+public class QuartzManager {
+
+
+ private static Log log = LogFactory.getFactory().getInstance(QuartzManager.class);
+ private static SchedulerFactory gSchedulerFactory = new StdSchedulerFactory();
+ private static String JOB_GROUP_NAME = "EXTJWEB_JOBGROUP_NAME";
+ private static String TRIGGER_GROUP_NAME = "EXTJWEB_TRIGGERGROUP_NAME";
+
+ /**
+ * @param jobName
+ * @param job
+ * @param time
+ * @throws SchedulerException
+ * @throws ParseException
+ */
+ public static boolean addJob(String jobName, String jobClass, String time,CollectVo collectVo) {
+ boolean sucess = false;
+ try {
+ Scheduler sched = gSchedulerFactory.getScheduler();
+ JobDetail jobDetail = new JobDetail(jobName, JOB_GROUP_NAME, Class.forName(jobClass));
+
+ CronTrigger trigger = new CronTrigger(jobName, TRIGGER_GROUP_NAME);
+ trigger.setCronExpression(time);
+
+ jobDetail.getJobDataMap().put("collectVo", collectVo);
+ sched.scheduleJob(jobDetail, trigger);
+ if (!sched.isShutdown()){
+ sched.start();
+
+ }
+ sucess = true;
+ } catch (Exception e) {
+ log.error("add job fail cronExpression="+time,e);
+ sucess = false;
+ }
+ return sucess;
+ }
+
+
+
+ /**
+ * @param jobName
+ * @param time
+ */
+ @SuppressWarnings("unchecked")
+ public static boolean modifyJobTime(String jobName, String time,CollectVo collectVo) {
+ boolean sucess = false;
+ try {
+ Scheduler sched = gSchedulerFactory.getScheduler();
+ CronTrigger trigger = (CronTrigger) sched.getTrigger(jobName, TRIGGER_GROUP_NAME);
+ if(trigger == null) {
+ return false;
+ }
+ String oldTime = trigger.getCronExpression();
+ if (!oldTime.equalsIgnoreCase(time)) {
+ JobDetail jobDetail = sched.getJobDetail(jobName, JOB_GROUP_NAME);
+
+ Class<Job> objJobClass = jobDetail.getJobClass();
+ String jobClass = objJobClass.getName();
+ removeJob(jobName);
+
+ addJob(jobName, jobClass, time,collectVo);
+ }
+ sucess = true;
+ } catch (Exception e) {
+ log.error("modifyJobTime fail cronExpression="+time,e);
+ sucess = false;
+ }
+ return sucess ;
+ }
+
+ /**
+
+
+ /**
+ * @param jobName
+ */
+ public static boolean removeJob(String jobName) {
+ boolean sucess = false;
+ try {
+ Scheduler sched = gSchedulerFactory.getScheduler();
+ sched.pauseTrigger(jobName, TRIGGER_GROUP_NAME);
+ sched.unscheduleJob(jobName, TRIGGER_GROUP_NAME);
+ sched.deleteJob(jobName, JOB_GROUP_NAME);
+ sucess = true;
+ } catch (Exception e) {
+ sucess = false;
+ log.error("remove job fail jobName="+jobName,e);
+ }
+ return sucess;
+ }
+
+
+
+ /**
+ *
+ * @return
+ */
+ public static boolean startJobs() {
+ boolean sucess = false;
+ try {
+ Scheduler sched = gSchedulerFactory.getScheduler();
+ sched.start();
+ sucess = true;
+ } catch (Exception e) {
+ sucess = false;
+ log.error("start jobs fail",e);
+ }
+ return sucess;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public static boolean shutdownJobs() {
+ boolean sucess = false;
+ try {
+ Scheduler sched = gSchedulerFactory.getScheduler();
+ if(!sched.isShutdown()) {
+ sched.shutdown();
+ }
+ sucess = true;
+ } catch (Exception e) {
+ sucess = false;
+ log.error("shutdown jobs fail ",e);
+ }
+
+ return sucess;
+ }
+}